HDFS-9414. Refactor reconfiguration of ClientDatanodeProtocol for reusability. (Contributed by Xiaobing Zhou)

This commit is contained in:
Arpit Agarwal 2015-12-04 20:24:08 -08:00
parent ce49ba39d2
commit 71be31201a
13 changed files with 579 additions and 120 deletions

View File

@ -117,6 +117,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>hdfs.proto</include> <include>hdfs.proto</include>
<include>encryption.proto</include> <include>encryption.proto</include>
<include>inotify.proto</include> <include>inotify.proto</include>
<include>ReconfigurationProtocol.proto</include>
</includes> </includes>
</source> </source>
<output>${project.build.directory}/generated-sources/java</output> <output>${project.build.directory}/generated-sources/java</output>

View File

@ -0,0 +1,36 @@
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
/**********************************************************************
* ReconfigurationProtocol is used by HDFS admin to reload configuration
* for NN/DN without restarting them.
**********************************************************************/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ReconfigurationProtocol {
long versionID = 1L;
/**
* Asynchronously reload configuration on disk and apply changes.
*/
void startReconfiguration() throws IOException;
/**
* Get the status of the previously issued reconfig task.
* @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
*/
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
/**
* Get a list of allowed properties for reconfiguration.
*/
List<String> listReconfigurableProperties() throws IOException;
}

View File

@ -22,17 +22,13 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -52,14 +48,12 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufHelper;
@ -316,39 +310,19 @@ public void startReconfiguration() throws IOException {
@Override @Override
public ReconfigurationTaskStatus getReconfigurationStatus() public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException { throws IOException {
GetReconfigurationStatusResponseProto response;
Map<PropertyChange, Optional<String>> statusMap = null;
long startTime;
long endTime = 0;
try { try {
response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER, return ReconfigurationProtocolUtils.getReconfigurationStatus(
VOID_GET_RECONFIG_STATUS); rpcProxy
startTime = response.getStartTime(); .getReconfigurationStatus(
if (response.hasEndTime()) { NULL_CONTROLLER,
endTime = response.getEndTime(); VOID_GET_RECONFIG_STATUS));
}
if (response.getChangesCount() > 0) {
statusMap = Maps.newHashMap();
for (GetReconfigurationStatusConfigChangeProto change :
response.getChangesList()) {
PropertyChange pc = new PropertyChange(
change.getName(), change.getNewValue(), change.getOldValue());
String errorMessage = null;
if (change.hasErrorMessage()) {
errorMessage = change.getErrorMessage();
}
statusMap.put(pc, Optional.fromNullable(errorMessage));
}
}
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
} }
@Override @Override
public List<String> listReconfigurableProperties() public List<String> listReconfigurableProperties() throws IOException {
throws IOException {
ListReconfigurablePropertiesResponseProto response; ListReconfigurablePropertiesResponseProto response;
try { try {
response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER, response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
@KerberosInfo(serverPrincipal =
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol",
protocolVersion = 1)
@InterfaceAudience.Public
@InterfaceStability.Evolving
/**
* Protocol that clients use to communicate with the NN/DN to do
* reconfiguration on the fly.
*
* Note: This extends the protocolbuffer service based interface to
* add annotations required for security.
*/
public interface ReconfigurationProtocolPB extends
ReconfigurationProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link ReconfigurationProtocol} interfaces to the RPC server implementing
* {@link ReconfigurationProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class ReconfigurationProtocolTranslatorPB implements
ProtocolMetaInterface, ReconfigurationProtocol, ProtocolTranslator,
Closeable {
public static final Logger LOG = LoggerFactory
.getLogger(ReconfigurationProtocolTranslatorPB.class);
private static final RpcController NULL_CONTROLLER = null;
private static final StartReconfigurationRequestProto VOID_START_RECONFIG =
StartReconfigurationRequestProto.newBuilder().build();
private static final ListReconfigurablePropertiesRequestProto
VOID_LIST_RECONFIGURABLE_PROPERTIES =
ListReconfigurablePropertiesRequestProto.newBuilder().build();
private static final GetReconfigurationStatusRequestProto
VOID_GET_RECONFIG_STATUS =
GetReconfigurationStatusRequestProto.newBuilder().build();
private final ReconfigurationProtocolPB rpcProxy;
public ReconfigurationProtocolTranslatorPB(InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory)
throws IOException {
rpcProxy = createReconfigurationProtocolProxy(addr, ticket, conf, factory,
0);
}
static ReconfigurationProtocolPB createReconfigurationProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int socketTimeout) throws IOException {
RPC.setProtocolEngine(conf, ReconfigurationProtocolPB.class,
ProtobufRpcEngine.class);
return RPC.getProxy(ReconfigurationProtocolPB.class,
RPC.getProtocolVersion(ReconfigurationProtocolPB.class),
addr, ticket, conf, factory, socketTimeout);
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
@Override
public void startReconfiguration() throws IOException {
try {
rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException {
try {
return ReconfigurationProtocolUtils.getReconfigurationStatus(
rpcProxy
.getReconfigurationStatus(
NULL_CONTROLLER,
VOID_GET_RECONFIG_STATUS));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public List<String> listReconfigurableProperties() throws IOException {
ListReconfigurablePropertiesResponseProto response;
try {
response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
VOID_LIST_RECONFIGURABLE_PROPERTIES);
return response.getNameList();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,
ReconfigurationProtocolPB.class,
RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ReconfigurationProtocolPB.class),
methodName);
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.util.Map;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
/**
* This is a client side utility class that handles
* common logic to to parameter reconfiguration.
*/
public class ReconfigurationProtocolUtils {
private ReconfigurationProtocolUtils() {
}
public static ReconfigurationTaskStatus getReconfigurationStatus(
GetReconfigurationStatusResponseProto response) {
Map<PropertyChange, Optional<String>> statusMap = null;
long startTime;
long endTime = 0;
startTime = response.getStartTime();
if (response.hasEndTime()) {
endTime = response.getEndTime();
}
if (response.getChangesCount() > 0) {
statusMap = Maps.newHashMap();
for (GetReconfigurationStatusConfigChangeProto change : response
.getChangesList()) {
PropertyChange pc = new PropertyChange(change.getName(),
change.getNewValue(), change.getOldValue());
String errorMessage = null;
if (change.hasErrorMessage()) {
errorMessage = change.getErrorMessage();
}
statusMap.put(pc, Optional.fromNullable(errorMessage));
}
}
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
}
}

View File

@ -33,6 +33,7 @@ package hadoop.hdfs;
import "Security.proto"; import "Security.proto";
import "hdfs.proto"; import "hdfs.proto";
import "ReconfigurationProtocol.proto";
/** /**
* block - block for which visible length is requested * block - block for which visible length is requested
@ -149,12 +150,6 @@ message GetDatanodeInfoResponseProto {
required DatanodeLocalInfoProto localInfo = 1; required DatanodeLocalInfoProto localInfo = 1;
} }
/** Asks DataNode to reload configuration file. */
message StartReconfigurationRequestProto {
}
message StartReconfigurationResponseProto {
}
message TriggerBlockReportRequestProto { message TriggerBlockReportRequestProto {
required bool incremental = 1; required bool incremental = 1;
@ -163,31 +158,6 @@ message TriggerBlockReportRequestProto {
message TriggerBlockReportResponseProto { message TriggerBlockReportResponseProto {
} }
/** Query the running status of reconfiguration process */
message GetReconfigurationStatusRequestProto {
}
message GetReconfigurationStatusConfigChangeProto {
required string name = 1;
required string oldValue = 2;
optional string newValue = 3;
optional string errorMessage = 4; // It is empty if success.
}
message GetReconfigurationStatusResponseProto {
required int64 startTime = 1;
optional int64 endTime = 2;
repeated GetReconfigurationStatusConfigChangeProto changes = 3;
}
message ListReconfigurablePropertiesRequestProto {
}
/** Query the reconfigurable properties on DataNode. */
message ListReconfigurablePropertiesResponseProto {
repeated string name = 1;
}
message GetBalancerBandwidthRequestProto { message GetBalancerBandwidthRequestProto {
} }

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that are used to reconfigure NameNode
// and DataNode by HDFS admin.
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "ReconfigurationProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
/** Asks NN/DN to reload configuration file. */
message StartReconfigurationRequestProto {
}
message StartReconfigurationResponseProto {
}
/** Query the running status of reconfiguration process */
message GetReconfigurationStatusRequestProto {
}
message GetReconfigurationStatusConfigChangeProto {
required string name = 1;
required string oldValue = 2;
optional string newValue = 3;
optional string errorMessage = 4; // It is empty if success.
}
message GetReconfigurationStatusResponseProto {
required int64 startTime = 1;
optional int64 endTime = 2;
repeated GetReconfigurationStatusConfigChangeProto changes = 3;
}
/** Query the reconfigurable properties on NN/DN. */
message ListReconfigurablePropertiesRequestProto {
}
message ListReconfigurablePropertiesResponseProto {
repeated string name = 1;
}
/**
* Protocol used from client to the NN/DN.
* See the request and response for details of rpc call.
*/
service ReconfigurationProtocolService {
rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
returns(GetReconfigurationStatusResponseProto);
rpc startReconfiguration(StartReconfigurationRequestProto)
returns(StartReconfigurationResponseProto);
rpc listReconfigurableProperties(
ListReconfigurablePropertiesRequestProto)
returns(ListReconfigurablePropertiesResponseProto);
}

View File

@ -7,11 +7,15 @@ Release 2.9.0 - UNRELEASED
NEW FEATURES NEW FEATURES
IMPROVEMENTS IMPROVEMENTS
HDFS-9267. TestDiskError should get stored replicas through
FsDatasetTestUtils. (Lei (Eddy) Xu via Colin P. McCabe)
HDFS-9491. Tests should get the number of pending async delets via HDFS-9267. TestDiskError should get stored replicas through
FsDatasetTestUtils. (Tony Wu via lei) FsDatasetTestUtils. (Lei (Eddy) Xu via Colin P. McCabe)
HDFS-9491. Tests should get the number of pending async delets via
FsDatasetTestUtils. (Tony Wu via lei)
HDFS-9414. Refactor reconfiguration of ClientDatanodeProtocol for
reusability. (Xiaobing Zhou via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -22,10 +22,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.common.base.Optional;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -38,22 +35,21 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -205,7 +201,7 @@ public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused,
@Override @Override
public StartReconfigurationResponseProto startReconfiguration( public StartReconfigurationResponseProto startReconfiguration(
RpcController unused, StartReconfigurationRequestProto request) RpcController unused, StartReconfigurationRequestProto request)
throws ServiceException { throws ServiceException {
try { try {
impl.startReconfiguration(); impl.startReconfiguration();
} catch (IOException e) { } catch (IOException e) {
@ -216,54 +212,27 @@ public StartReconfigurationResponseProto startReconfiguration(
@Override @Override
public ListReconfigurablePropertiesResponseProto listReconfigurableProperties( public ListReconfigurablePropertiesResponseProto listReconfigurableProperties(
RpcController controller, RpcController controller,
ListReconfigurablePropertiesRequestProto request) ListReconfigurablePropertiesRequestProto request)
throws ServiceException { throws ServiceException {
ListReconfigurablePropertiesResponseProto.Builder builder =
ListReconfigurablePropertiesResponseProto.newBuilder();
try { try {
for (String name : impl.listReconfigurableProperties()) { return ReconfigurationProtocolServerSideUtils
builder.addName(name); .listReconfigurableProperties(impl.listReconfigurableProperties());
}
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
return builder.build();
} }
@Override @Override
public GetReconfigurationStatusResponseProto getReconfigurationStatus( public GetReconfigurationStatusResponseProto getReconfigurationStatus(
RpcController unused, GetReconfigurationStatusRequestProto request) RpcController unused, GetReconfigurationStatusRequestProto request)
throws ServiceException { throws ServiceException {
GetReconfigurationStatusResponseProto.Builder builder =
GetReconfigurationStatusResponseProto.newBuilder();
try { try {
ReconfigurationTaskStatus status = impl.getReconfigurationStatus(); return ReconfigurationProtocolServerSideUtils
builder.setStartTime(status.getStartTime()); .getReconfigurationStatus(impl.getReconfigurationStatus());
if (status.stopped()) {
builder.setEndTime(status.getEndTime());
assert status.getStatus() != null;
for (Map.Entry<PropertyChange, Optional<String>> result :
status.getStatus().entrySet()) {
GetReconfigurationStatusConfigChangeProto.Builder changeBuilder =
GetReconfigurationStatusConfigChangeProto.newBuilder();
PropertyChange change = result.getKey();
changeBuilder.setName(change.prop);
changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : "");
if (change.newVal != null) {
changeBuilder.setNewValue(change.newVal);
}
if (result.getValue().isPresent()) {
// Get full stack trace.
changeBuilder.setErrorMessage(result.getValue().get());
}
builder.addChanges(changeBuilder);
}
}
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
return builder.build();
} }
@Override @Override

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is used on the server side. Calls come across the wire for the
* for protocol {@link ReconfigurationProtocolPB}.
* This class translates the PB data types
* to the native data types used inside the NN/DN as specified in the generic
* ReconfigurationProtocol.
*/
public class ReconfigurationProtocolServerSideTranslatorPB implements
ReconfigurationProtocolPB {
private final ReconfigurationProtocol impl;
private static final StartReconfigurationResponseProto START_RECONFIG_RESP =
StartReconfigurationResponseProto.newBuilder().build();
public ReconfigurationProtocolServerSideTranslatorPB(
ReconfigurationProtocol impl) {
this.impl = impl;
}
@Override
public StartReconfigurationResponseProto startReconfiguration(
RpcController controller, StartReconfigurationRequestProto request)
throws ServiceException {
try {
impl.startReconfiguration();
} catch (IOException e) {
throw new ServiceException(e);
}
return START_RECONFIG_RESP;
}
@Override
public ListReconfigurablePropertiesResponseProto listReconfigurableProperties(
RpcController controller,
ListReconfigurablePropertiesRequestProto request)
throws ServiceException {
try {
return ReconfigurationProtocolServerSideUtils
.listReconfigurableProperties(impl.listReconfigurableProperties());
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetReconfigurationStatusResponseProto getReconfigurationStatus(
RpcController unused, GetReconfigurationStatusRequestProto request)
throws ServiceException {
try {
return ReconfigurationProtocolServerSideUtils
.getReconfigurationStatus(impl.getReconfigurationStatus());
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import com.google.common.base.Optional;
/**
* This is a server side utility class that handles
* common logic to to parameter reconfiguration.
*/
public class ReconfigurationProtocolServerSideUtils {
private ReconfigurationProtocolServerSideUtils() {
}
public static ListReconfigurablePropertiesResponseProto
listReconfigurableProperties(
List<String> reconfigurableProperties) {
ListReconfigurablePropertiesResponseProto.Builder builder =
ListReconfigurablePropertiesResponseProto.newBuilder();
for (String name : reconfigurableProperties) {
builder.addName(name);
}
return builder.build();
}
public static GetReconfigurationStatusResponseProto getReconfigurationStatus(
ReconfigurationTaskStatus status) {
GetReconfigurationStatusResponseProto.Builder builder =
GetReconfigurationStatusResponseProto.newBuilder();
builder.setStartTime(status.getStartTime());
if (status.stopped()) {
builder.setEndTime(status.getEndTime());
assert status.getStatus() != null;
for (Map.Entry<PropertyChange, Optional<String>> result : status
.getStatus().entrySet()) {
GetReconfigurationStatusConfigChangeProto.Builder changeBuilder =
GetReconfigurationStatusConfigChangeProto.newBuilder();
PropertyChange change = result.getKey();
changeBuilder.setName(change.prop);
changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : "");
if (change.newVal != null) {
changeBuilder.setNewValue(change.newVal);
}
if (result.getValue().isPresent()) {
// Get full stack trace.
changeBuilder.setErrorMessage(result.getValue().get());
}
builder.addChanges(changeBuilder);
}
}
return builder.build();
}
}

View File

@ -49,6 +49,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -126,6 +127,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@ -145,6 +147,8 @@
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
@ -252,7 +256,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class DataNode extends ReconfigurableBase public class DataNode extends ReconfigurableBase
implements InterDatanodeProtocol, ClientDatanodeProtocol, implements InterDatanodeProtocol, ClientDatanodeProtocol,
TraceAdminProtocol, DataNodeMXBean { TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {
public static final Logger LOG = LoggerFactory.getLogger(DataNode.class); public static final Logger LOG = LoggerFactory.getLogger(DataNode.class);
static{ static{
@ -919,7 +923,14 @@ private void initIpcServer(Configuration conf) throws IOException {
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false) DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build(); .setSecretManager(blockPoolTokenSecretManager).build();
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
= new ReconfigurationProtocolServerSideTranslatorPB(this);
service = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, service,
ipcServer);
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this); new InterDatanodeProtocolServerSideTranslatorPB(this);
service = InterDatanodeProtocolService service = InterDatanodeProtocolService
@ -2907,19 +2918,19 @@ public DatanodeLocalInfo getDatanodeInfo() {
confVersion, uptime); confVersion, uptime);
} }
@Override // ClientDatanodeProtocol @Override // ClientDatanodeProtocol & ReconfigurationProtocol
public void startReconfiguration() throws IOException { public void startReconfiguration() throws IOException {
checkSuperuserPrivilege(); checkSuperuserPrivilege();
startReconfigurationTask(); startReconfigurationTask();
} }
@Override // ClientDatanodeProtocol @Override // ClientDatanodeProtocol & ReconfigurationProtocol
public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException { public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
checkSuperuserPrivilege(); checkSuperuserPrivilege();
return getReconfigurationTaskStatus(); return getReconfigurationTaskStatus();
} }
@Override // ClientDatanodeProtocol @Override // ClientDatanodeProtocol & ReconfigurationProtocol
public List<String> listReconfigurableProperties() public List<String> listReconfigurableProperties()
throws IOException { throws IOException {
return RECONFIGURABLE_PROPERTIES; return RECONFIGURABLE_PROPERTIES;