HDFS-9414. Refactor reconfiguration of ClientDatanodeProtocol for reusability. (Contributed by Xiaobing Zhou)

This commit is contained in:
Arpit Agarwal 2015-12-04 20:24:08 -08:00
parent 4265a85f6d
commit 86c95cb31a
13 changed files with 579 additions and 121 deletions

View File

@ -118,6 +118,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>encryption.proto</include>
<include>inotify.proto</include>
<include>erasurecoding.proto</include>
<include>ReconfigurationProtocol.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>

View File

@ -0,0 +1,36 @@
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
/**********************************************************************
* ReconfigurationProtocol is used by HDFS admin to reload configuration
* for NN/DN without restarting them.
**********************************************************************/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ReconfigurationProtocol {
long versionID = 1L;
/**
* Asynchronously reload configuration on disk and apply changes.
*/
void startReconfiguration() throws IOException;
/**
* Get the status of the previously issued reconfig task.
* @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
*/
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
/**
* Get a list of allowed properties for reconfiguration.
*/
List<String> listReconfigurableProperties() throws IOException;
}

View File

@ -21,17 +21,13 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import javax.net.SocketFactory;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -48,14 +44,12 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlo
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
@ -273,39 +267,19 @@ public class ClientDatanodeProtocolTranslatorPB implements
@Override
public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException {
GetReconfigurationStatusResponseProto response;
Map<PropertyChange, Optional<String>> statusMap = null;
long startTime;
long endTime = 0;
try {
response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER,
VOID_GET_RECONFIG_STATUS);
startTime = response.getStartTime();
if (response.hasEndTime()) {
endTime = response.getEndTime();
}
if (response.getChangesCount() > 0) {
statusMap = Maps.newHashMap();
for (GetReconfigurationStatusConfigChangeProto change :
response.getChangesList()) {
PropertyChange pc = new PropertyChange(
change.getName(), change.getNewValue(), change.getOldValue());
String errorMessage = null;
if (change.hasErrorMessage()) {
errorMessage = change.getErrorMessage();
}
statusMap.put(pc, Optional.fromNullable(errorMessage));
}
}
return ReconfigurationProtocolUtils.getReconfigurationStatus(
rpcProxy
.getReconfigurationStatus(
NULL_CONTROLLER,
VOID_GET_RECONFIG_STATUS));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
}
@Override
public List<String> listReconfigurableProperties()
throws IOException {
public List<String> listReconfigurableProperties() throws IOException {
ListReconfigurablePropertiesResponseProto response;
try {
response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
@KerberosInfo(serverPrincipal =
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol",
protocolVersion = 1)
@InterfaceAudience.Public
@InterfaceStability.Evolving
/**
* Protocol that clients use to communicate with the NN/DN to do
* reconfiguration on the fly.
*
* Note: This extends the protocolbuffer service based interface to
* add annotations required for security.
*/
public interface ReconfigurationProtocolPB extends
ReconfigurationProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link ReconfigurationProtocol} interfaces to the RPC server implementing
* {@link ReconfigurationProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class ReconfigurationProtocolTranslatorPB implements
ProtocolMetaInterface, ReconfigurationProtocol, ProtocolTranslator,
Closeable {
public static final Logger LOG = LoggerFactory
.getLogger(ReconfigurationProtocolTranslatorPB.class);
private static final RpcController NULL_CONTROLLER = null;
private static final StartReconfigurationRequestProto VOID_START_RECONFIG =
StartReconfigurationRequestProto.newBuilder().build();
private static final ListReconfigurablePropertiesRequestProto
VOID_LIST_RECONFIGURABLE_PROPERTIES =
ListReconfigurablePropertiesRequestProto.newBuilder().build();
private static final GetReconfigurationStatusRequestProto
VOID_GET_RECONFIG_STATUS =
GetReconfigurationStatusRequestProto.newBuilder().build();
private final ReconfigurationProtocolPB rpcProxy;
public ReconfigurationProtocolTranslatorPB(InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory)
throws IOException {
rpcProxy = createReconfigurationProtocolProxy(addr, ticket, conf, factory,
0);
}
static ReconfigurationProtocolPB createReconfigurationProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int socketTimeout) throws IOException {
RPC.setProtocolEngine(conf, ReconfigurationProtocolPB.class,
ProtobufRpcEngine.class);
return RPC.getProxy(ReconfigurationProtocolPB.class,
RPC.getProtocolVersion(ReconfigurationProtocolPB.class),
addr, ticket, conf, factory, socketTimeout);
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
@Override
public void startReconfiguration() throws IOException {
try {
rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException {
try {
return ReconfigurationProtocolUtils.getReconfigurationStatus(
rpcProxy
.getReconfigurationStatus(
NULL_CONTROLLER,
VOID_GET_RECONFIG_STATUS));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public List<String> listReconfigurableProperties() throws IOException {
ListReconfigurablePropertiesResponseProto response;
try {
response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
VOID_LIST_RECONFIGURABLE_PROPERTIES);
return response.getNameList();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,
ReconfigurationProtocolPB.class,
RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ReconfigurationProtocolPB.class),
methodName);
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.util.Map;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
/**
* This is a client side utility class that handles
* common logic to to parameter reconfiguration.
*/
public class ReconfigurationProtocolUtils {
private ReconfigurationProtocolUtils() {
}
public static ReconfigurationTaskStatus getReconfigurationStatus(
GetReconfigurationStatusResponseProto response) {
Map<PropertyChange, Optional<String>> statusMap = null;
long startTime;
long endTime = 0;
startTime = response.getStartTime();
if (response.hasEndTime()) {
endTime = response.getEndTime();
}
if (response.getChangesCount() > 0) {
statusMap = Maps.newHashMap();
for (GetReconfigurationStatusConfigChangeProto change : response
.getChangesList()) {
PropertyChange pc = new PropertyChange(change.getName(),
change.getNewValue(), change.getOldValue());
String errorMessage = null;
if (change.hasErrorMessage()) {
errorMessage = change.getErrorMessage();
}
statusMap.put(pc, Optional.fromNullable(errorMessage));
}
}
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
}
}

View File

@ -33,6 +33,7 @@ package hadoop.hdfs;
import "Security.proto";
import "hdfs.proto";
import "ReconfigurationProtocol.proto";
/**
* block - block for which visible length is requested
@ -123,12 +124,6 @@ message GetDatanodeInfoResponseProto {
required DatanodeLocalInfoProto localInfo = 1;
}
/** Asks DataNode to reload configuration file. */
message StartReconfigurationRequestProto {
}
message StartReconfigurationResponseProto {
}
message TriggerBlockReportRequestProto {
required bool incremental = 1;
@ -137,31 +132,6 @@ message TriggerBlockReportRequestProto {
message TriggerBlockReportResponseProto {
}
/** Query the running status of reconfiguration process */
message GetReconfigurationStatusRequestProto {
}
message GetReconfigurationStatusConfigChangeProto {
required string name = 1;
required string oldValue = 2;
optional string newValue = 3;
optional string errorMessage = 4; // It is empty if success.
}
message GetReconfigurationStatusResponseProto {
required int64 startTime = 1;
optional int64 endTime = 2;
repeated GetReconfigurationStatusConfigChangeProto changes = 3;
}
message ListReconfigurablePropertiesRequestProto {
}
/** Query the reconfigurable properties on DataNode. */
message ListReconfigurablePropertiesResponseProto {
repeated string name = 1;
}
message GetBalancerBandwidthRequestProto {
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that are used to reconfigure NameNode
// and DataNode by HDFS admin.
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "ReconfigurationProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
/** Asks NN/DN to reload configuration file. */
message StartReconfigurationRequestProto {
}
message StartReconfigurationResponseProto {
}
/** Query the running status of reconfiguration process */
message GetReconfigurationStatusRequestProto {
}
message GetReconfigurationStatusConfigChangeProto {
required string name = 1;
required string oldValue = 2;
optional string newValue = 3;
optional string errorMessage = 4; // It is empty if success.
}
message GetReconfigurationStatusResponseProto {
required int64 startTime = 1;
optional int64 endTime = 2;
repeated GetReconfigurationStatusConfigChangeProto changes = 3;
}
/** Query the reconfigurable properties on NN/DN. */
message ListReconfigurablePropertiesRequestProto {
}
message ListReconfigurablePropertiesResponseProto {
repeated string name = 1;
}
/**
* Protocol used from client to the NN/DN.
* See the request and response for details of rpc call.
*/
service ReconfigurationProtocolService {
rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
returns(GetReconfigurationStatusResponseProto);
rpc startReconfiguration(StartReconfigurationRequestProto)
returns(StartReconfigurationResponseProto);
rpc listReconfigurableProperties(
ListReconfigurablePropertiesRequestProto)
returns(ListReconfigurablePropertiesResponseProto);
}

View File

@ -883,11 +883,15 @@ Release 2.9.0 - UNRELEASED
NEW FEATURES
IMPROVEMENTS
HDFS-9267. TestDiskError should get stored replicas through
FsDatasetTestUtils. (Lei (Eddy) Xu via Colin P. McCabe)
HDFS-9491. Tests should get the number of pending async delets via
FsDatasetTestUtils. (Tony Wu via lei)
HDFS-9267. TestDiskError should get stored replicas through
FsDatasetTestUtils. (Lei (Eddy) Xu via Colin P. McCabe)
HDFS-9491. Tests should get the number of pending async delets via
FsDatasetTestUtils. (Tony Wu via lei)
HDFS-9414. Refactor reconfiguration of ClientDatanodeProtocol for
reusability. (Xiaobing Zhou via Arpit Agarwal)
OPTIMIZATIONS

View File

@ -18,12 +18,8 @@
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.Map;
import com.google.common.base.Optional;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -35,19 +31,18 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlo
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
@ -162,7 +157,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
@Override
public StartReconfigurationResponseProto startReconfiguration(
RpcController unused, StartReconfigurationRequestProto request)
throws ServiceException {
throws ServiceException {
try {
impl.startReconfiguration();
} catch (IOException e) {
@ -173,54 +168,27 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
@Override
public ListReconfigurablePropertiesResponseProto listReconfigurableProperties(
RpcController controller,
ListReconfigurablePropertiesRequestProto request)
RpcController controller,
ListReconfigurablePropertiesRequestProto request)
throws ServiceException {
ListReconfigurablePropertiesResponseProto.Builder builder =
ListReconfigurablePropertiesResponseProto.newBuilder();
try {
for (String name : impl.listReconfigurableProperties()) {
builder.addName(name);
}
return ReconfigurationProtocolServerSideUtils
.listReconfigurableProperties(impl.listReconfigurableProperties());
} catch (IOException e) {
throw new ServiceException(e);
}
return builder.build();
}
@Override
public GetReconfigurationStatusResponseProto getReconfigurationStatus(
RpcController unused, GetReconfigurationStatusRequestProto request)
throws ServiceException {
GetReconfigurationStatusResponseProto.Builder builder =
GetReconfigurationStatusResponseProto.newBuilder();
try {
ReconfigurationTaskStatus status = impl.getReconfigurationStatus();
builder.setStartTime(status.getStartTime());
if (status.stopped()) {
builder.setEndTime(status.getEndTime());
assert status.getStatus() != null;
for (Map.Entry<PropertyChange, Optional<String>> result :
status.getStatus().entrySet()) {
GetReconfigurationStatusConfigChangeProto.Builder changeBuilder =
GetReconfigurationStatusConfigChangeProto.newBuilder();
PropertyChange change = result.getKey();
changeBuilder.setName(change.prop);
changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : "");
if (change.newVal != null) {
changeBuilder.setNewValue(change.newVal);
}
if (result.getValue().isPresent()) {
// Get full stack trace.
changeBuilder.setErrorMessage(result.getValue().get());
}
builder.addChanges(changeBuilder);
}
}
return ReconfigurationProtocolServerSideUtils
.getReconfigurationStatus(impl.getReconfigurationStatus());
} catch (IOException e) {
throw new ServiceException(e);
}
return builder.build();
}
@Override

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is used on the server side. Calls come across the wire for the
* for protocol {@link ReconfigurationProtocolPB}.
* This class translates the PB data types
* to the native data types used inside the NN/DN as specified in the generic
* ReconfigurationProtocol.
*/
public class ReconfigurationProtocolServerSideTranslatorPB implements
ReconfigurationProtocolPB {
private final ReconfigurationProtocol impl;
private static final StartReconfigurationResponseProto START_RECONFIG_RESP =
StartReconfigurationResponseProto.newBuilder().build();
public ReconfigurationProtocolServerSideTranslatorPB(
ReconfigurationProtocol impl) {
this.impl = impl;
}
@Override
public StartReconfigurationResponseProto startReconfiguration(
RpcController controller, StartReconfigurationRequestProto request)
throws ServiceException {
try {
impl.startReconfiguration();
} catch (IOException e) {
throw new ServiceException(e);
}
return START_RECONFIG_RESP;
}
@Override
public ListReconfigurablePropertiesResponseProto listReconfigurableProperties(
RpcController controller,
ListReconfigurablePropertiesRequestProto request)
throws ServiceException {
try {
return ReconfigurationProtocolServerSideUtils
.listReconfigurableProperties(impl.listReconfigurableProperties());
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetReconfigurationStatusResponseProto getReconfigurationStatus(
RpcController unused, GetReconfigurationStatusRequestProto request)
throws ServiceException {
try {
return ReconfigurationProtocolServerSideUtils
.getReconfigurationStatus(impl.getReconfigurationStatus());
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import com.google.common.base.Optional;
/**
* This is a server side utility class that handles
* common logic to to parameter reconfiguration.
*/
public class ReconfigurationProtocolServerSideUtils {
private ReconfigurationProtocolServerSideUtils() {
}
public static ListReconfigurablePropertiesResponseProto
listReconfigurableProperties(
List<String> reconfigurableProperties) {
ListReconfigurablePropertiesResponseProto.Builder builder =
ListReconfigurablePropertiesResponseProto.newBuilder();
for (String name : reconfigurableProperties) {
builder.addName(name);
}
return builder.build();
}
public static GetReconfigurationStatusResponseProto getReconfigurationStatus(
ReconfigurationTaskStatus status) {
GetReconfigurationStatusResponseProto.Builder builder =
GetReconfigurationStatusResponseProto.newBuilder();
builder.setStartTime(status.getStartTime());
if (status.stopped()) {
builder.setEndTime(status.getEndTime());
assert status.getStatus() != null;
for (Map.Entry<PropertyChange, Optional<String>> result : status
.getStatus().entrySet()) {
GetReconfigurationStatusConfigChangeProto.Builder changeBuilder =
GetReconfigurationStatusConfigChangeProto.newBuilder();
PropertyChange change = result.getKey();
changeBuilder.setName(change.prop);
changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : "");
if (change.newVal != null) {
changeBuilder.setNewValue(change.newVal);
}
if (result.getValue().isPresent()) {
// Get full stack trace.
changeBuilder.setErrorMessage(result.getValue().get());
}
builder.addChanges(changeBuilder);
}
}
return builder.build();
}
}

View File

@ -49,6 +49,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
@ -124,6 +125,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@ -143,6 +145,8 @@ import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
@ -251,7 +255,7 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class DataNode extends ReconfigurableBase
implements InterDatanodeProtocol, ClientDatanodeProtocol,
TraceAdminProtocol, DataNodeMXBean {
TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {
public static final Logger LOG = LoggerFactory.getLogger(DataNode.class);
static{
@ -914,7 +918,14 @@ public class DataNode extends ReconfigurableBase
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build();
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
= new ReconfigurationProtocolServerSideTranslatorPB(this);
service = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, service,
ipcServer);
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this);
service = InterDatanodeProtocolService
@ -2918,19 +2929,19 @@ public class DataNode extends ReconfigurableBase
confVersion, uptime);
}
@Override // ClientDatanodeProtocol
@Override // ClientDatanodeProtocol & ReconfigurationProtocol
public void startReconfiguration() throws IOException {
checkSuperuserPrivilege();
startReconfigurationTask();
}
@Override // ClientDatanodeProtocol
@Override // ClientDatanodeProtocol & ReconfigurationProtocol
public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
checkSuperuserPrivilege();
return getReconfigurationTaskStatus();
}
@Override // ClientDatanodeProtocol
@Override // ClientDatanodeProtocol & ReconfigurationProtocol
public List<String> listReconfigurableProperties()
throws IOException {
return RECONFIGURABLE_PROPERTIES;