HDFS-8582. Support getting a list of reconfigurable config properties and do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P. McCabe)
(cherry picked from commit d112d18324
)
This commit is contained in:
parent
fcd1824b61
commit
c94579f845
|
@ -88,6 +88,11 @@ public abstract class ReconfigurableBase
|
||||||
reconfigurationUtil = Preconditions.checkNotNull(ru);
|
reconfigurationUtil = Preconditions.checkNotNull(ru);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new configuration.
|
||||||
|
*/
|
||||||
|
protected abstract Configuration getNewConf();
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public Collection<PropertyChange> getChangedProperties(
|
public Collection<PropertyChange> getChangedProperties(
|
||||||
Configuration newConf, Configuration oldConf) {
|
Configuration newConf, Configuration oldConf) {
|
||||||
|
@ -108,17 +113,16 @@ public abstract class ReconfigurableBase
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Starting reconfiguration task.");
|
LOG.info("Starting reconfiguration task.");
|
||||||
Configuration oldConf = this.parent.getConf();
|
Configuration oldConf = this.parent.getConf();
|
||||||
Configuration newConf = new Configuration();
|
Configuration newConf = this.parent.getNewConf();
|
||||||
Collection<PropertyChange> changes =
|
Collection<PropertyChange> changes =
|
||||||
this.parent.getChangedProperties(newConf, oldConf);
|
this.parent.getChangedProperties(newConf, oldConf);
|
||||||
Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
|
Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
|
||||||
for (PropertyChange change : changes) {
|
for (PropertyChange change : changes) {
|
||||||
String errorMessage = null;
|
String errorMessage = null;
|
||||||
if (!this.parent.isPropertyReconfigurable(change.prop)) {
|
if (!this.parent.isPropertyReconfigurable(change.prop)) {
|
||||||
errorMessage = "Property " + change.prop +
|
LOG.info(String.format(
|
||||||
" is not reconfigurable";
|
"Property %s is not configurable: old value: %s, new value: %s",
|
||||||
LOG.info(errorMessage);
|
change.prop, change.oldVal, change.newVal));
|
||||||
results.put(change, Optional.of(errorMessage));
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
LOG.info("Change property: " + change.prop + " from \""
|
LOG.info("Change property: " + change.prop + " from \""
|
||||||
|
|
|
@ -118,6 +118,11 @@ public class TestReconfiguration {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration getNewConf() {
|
||||||
|
return new Configuration();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<String> getReconfigurableProperties() {
|
public Collection<String> getReconfigurableProperties() {
|
||||||
return Arrays.asList(PROP1, PROP2, PROP4);
|
return Arrays.asList(PROP1, PROP2, PROP4);
|
||||||
|
@ -336,6 +341,11 @@ public class TestReconfiguration {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration getNewConf() {
|
||||||
|
return new Configuration();
|
||||||
|
}
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -397,7 +407,7 @@ public class TestReconfiguration {
|
||||||
|
|
||||||
waitAsyncReconfigureTaskFinish(dummy);
|
waitAsyncReconfigureTaskFinish(dummy);
|
||||||
ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
|
ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
|
||||||
assertEquals(3, status.getStatus().size());
|
assertEquals(2, status.getStatus().size());
|
||||||
for (Map.Entry<PropertyChange, Optional<String>> result :
|
for (Map.Entry<PropertyChange, Optional<String>> result :
|
||||||
status.getStatus().entrySet()) {
|
status.getStatus().entrySet()) {
|
||||||
PropertyChange change = result.getKey();
|
PropertyChange change = result.getKey();
|
||||||
|
|
|
@ -312,6 +312,10 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch.
|
HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch.
|
||||||
(vinayakumarb via wang)
|
(vinayakumarb via wang)
|
||||||
|
|
||||||
|
HDFS-8582. Support getting a list of reconfigurable config properties and
|
||||||
|
do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P.
|
||||||
|
McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -160,6 +160,11 @@ public interface ClientDatanodeProtocol {
|
||||||
*/
|
*/
|
||||||
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
|
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of allowed properties for reconfiguration.
|
||||||
|
*/
|
||||||
|
List<String> listReconfigurableProperties() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Trigger a new block report.
|
* Trigger a new block report.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -44,6 +44,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdf
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
|
||||||
|
@ -208,6 +210,23 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
|
||||||
return START_RECONFIG_RESP;
|
return START_RECONFIG_RESP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListReconfigurablePropertiesResponseProto listReconfigurableProperties(
|
||||||
|
RpcController controller,
|
||||||
|
ListReconfigurablePropertiesRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
ListReconfigurablePropertiesResponseProto.Builder builder =
|
||||||
|
ListReconfigurablePropertiesResponseProto.newBuilder();
|
||||||
|
try {
|
||||||
|
for (String name : impl.listReconfigurableProperties()) {
|
||||||
|
builder.addName(name);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetReconfigurationStatusResponseProto getReconfigurationStatus(
|
public GetReconfigurationStatusResponseProto getReconfigurationStatus(
|
||||||
RpcController unused, GetReconfigurationStatusRequestProto request)
|
RpcController unused, GetReconfigurationStatusRequestProto request)
|
||||||
|
|
|
@ -52,6 +52,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDat
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
|
||||||
|
@ -59,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRec
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
|
@ -101,6 +102,9 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
||||||
GetReconfigurationStatusRequestProto.newBuilder().build();
|
GetReconfigurationStatusRequestProto.newBuilder().build();
|
||||||
private final static StartReconfigurationRequestProto VOID_START_RECONFIG =
|
private final static StartReconfigurationRequestProto VOID_START_RECONFIG =
|
||||||
StartReconfigurationRequestProto.newBuilder().build();
|
StartReconfigurationRequestProto.newBuilder().build();
|
||||||
|
private static final ListReconfigurablePropertiesRequestProto
|
||||||
|
VOID_LIST_RECONFIGURABLE_PROPERTIES =
|
||||||
|
ListReconfigurablePropertiesRequestProto.newBuilder().build();
|
||||||
|
|
||||||
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
||||||
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
|
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
|
||||||
|
@ -337,6 +341,19 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
||||||
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
|
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> listReconfigurableProperties()
|
||||||
|
throws IOException {
|
||||||
|
ListReconfigurablePropertiesResponseProto response;
|
||||||
|
try {
|
||||||
|
response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
|
||||||
|
VOID_LIST_RECONFIGURABLE_PROPERTIES);
|
||||||
|
return response.getNameList();
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void triggerBlockReport(BlockReportOptions options)
|
public void triggerBlockReport(BlockReportOptions options)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -273,6 +273,11 @@ public class DataNode extends ReconfigurableBase
|
||||||
|
|
||||||
static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
|
static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
|
||||||
|
|
||||||
|
/** A list of property that are reconfigurable at runtime. */
|
||||||
|
private static final List<String> RECONFIGURABLE_PROPERTIES =
|
||||||
|
Collections.unmodifiableList(
|
||||||
|
Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use {@link NetUtils#createSocketAddr(String)} instead.
|
* Use {@link NetUtils#createSocketAddr(String)} instead.
|
||||||
*/
|
*/
|
||||||
|
@ -450,6 +455,11 @@ public class DataNode extends ReconfigurableBase
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // ReconfigurableBase
|
||||||
|
protected Configuration getNewConf() {
|
||||||
|
return new HdfsConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reconfigurePropertyImpl(String property, String newVal)
|
public void reconfigurePropertyImpl(String property, String newVal)
|
||||||
throws ReconfigurationException {
|
throws ReconfigurationException {
|
||||||
|
@ -470,11 +480,9 @@ public class DataNode extends ReconfigurableBase
|
||||||
/**
|
/**
|
||||||
* Get a list of the keys of the re-configurable properties in configuration.
|
* Get a list of the keys of the re-configurable properties in configuration.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override // Reconfigurable
|
||||||
public Collection<String> getReconfigurableProperties() {
|
public Collection<String> getReconfigurableProperties() {
|
||||||
List<String> reconfigurable =
|
return RECONFIGURABLE_PROPERTIES;
|
||||||
Collections.unmodifiableList(Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
|
|
||||||
return reconfigurable;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3052,6 +3060,12 @@ public class DataNode extends ReconfigurableBase
|
||||||
return getReconfigurationTaskStatus();
|
return getReconfigurationTaskStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // ClientDatanodeProtocol
|
||||||
|
public List<String> listReconfigurableProperties()
|
||||||
|
throws IOException {
|
||||||
|
return RECONFIGURABLE_PROPERTIES;
|
||||||
|
}
|
||||||
|
|
||||||
@Override // ClientDatanodeProtocol
|
@Override // ClientDatanodeProtocol
|
||||||
public void triggerBlockReport(BlockReportOptions options)
|
public void triggerBlockReport(BlockReportOptions options)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -408,7 +408,7 @@ public class DFSAdmin extends FsShell {
|
||||||
"\t[-refreshSuperUserGroupsConfiguration]\n" +
|
"\t[-refreshSuperUserGroupsConfiguration]\n" +
|
||||||
"\t[-refreshCallQueue]\n" +
|
"\t[-refreshCallQueue]\n" +
|
||||||
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
|
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
|
||||||
"\t[-reconfig <datanode|...> <host:ipc_port> <start|status>]\n" +
|
"\t[-reconfig <datanode|...> <host:ipc_port> <start|status|properties>]\n" +
|
||||||
"\t[-printTopology]\n" +
|
"\t[-printTopology]\n" +
|
||||||
"\t[-refreshNamenodes datanode_host:ipc_port]\n"+
|
"\t[-refreshNamenodes datanode_host:ipc_port]\n"+
|
||||||
"\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
|
"\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
|
||||||
|
@ -951,8 +951,9 @@ public class DFSAdmin extends FsShell {
|
||||||
|
|
||||||
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
|
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
|
||||||
|
|
||||||
String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status>:\n" +
|
String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status|properties>:\n" +
|
||||||
"\tStarts reconfiguration or gets the status of an ongoing reconfiguration.\n" +
|
"\tStarts or gets the status of a reconfiguration operation, \n" +
|
||||||
|
"\tor gets a list of reconfigurable properties.\n" +
|
||||||
"\tThe second parameter specifies the node type.\n" +
|
"\tThe second parameter specifies the node type.\n" +
|
||||||
"\tCurrently, only reloading DataNode's configuration is supported.\n";
|
"\tCurrently, only reloading DataNode's configuration is supported.\n";
|
||||||
|
|
||||||
|
@ -1411,6 +1412,9 @@ public class DFSAdmin extends FsShell {
|
||||||
return startReconfiguration(nodeType, address);
|
return startReconfiguration(nodeType, address);
|
||||||
} else if ("status".equals(op)) {
|
} else if ("status".equals(op)) {
|
||||||
return getReconfigurationStatus(nodeType, address, System.out, System.err);
|
return getReconfigurationStatus(nodeType, address, System.out, System.err);
|
||||||
|
} else if ("properties".equals(op)) {
|
||||||
|
return getReconfigurableProperties(
|
||||||
|
nodeType, address, System.out, System.err);
|
||||||
}
|
}
|
||||||
System.err.println("Unknown operation: " + op);
|
System.err.println("Unknown operation: " + op);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1448,18 +1452,24 @@ public class DFSAdmin extends FsShell {
|
||||||
|
|
||||||
out.println(" and finished at " +
|
out.println(" and finished at " +
|
||||||
new Date(status.getEndTime()).toString() + ".");
|
new Date(status.getEndTime()).toString() + ".");
|
||||||
|
if (status.getStatus() == null) {
|
||||||
|
// Nothing to report.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
for (Map.Entry<PropertyChange, Optional<String>> result :
|
for (Map.Entry<PropertyChange, Optional<String>> result :
|
||||||
status.getStatus().entrySet()) {
|
status.getStatus().entrySet()) {
|
||||||
if (!result.getValue().isPresent()) {
|
if (!result.getValue().isPresent()) {
|
||||||
out.print("SUCCESS: ");
|
out.printf(
|
||||||
} else {
|
"SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
||||||
out.print("FAILED: ");
|
|
||||||
}
|
|
||||||
out.printf("Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
|
||||||
result.getKey().prop, result.getKey().oldVal,
|
result.getKey().prop, result.getKey().oldVal,
|
||||||
result.getKey().newVal);
|
result.getKey().newVal);
|
||||||
if (result.getValue().isPresent()) {
|
} else {
|
||||||
out.println("\tError: " + result.getValue().get() + ".");
|
final String errorMsg = result.getValue().get();
|
||||||
|
out.printf(
|
||||||
|
"FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
||||||
|
result.getKey().prop, result.getKey().oldVal,
|
||||||
|
result.getKey().newVal);
|
||||||
|
out.println("\tError: " + errorMsg + ".");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -1467,7 +1477,32 @@ public class DFSAdmin extends FsShell {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err.println("Node type " + nodeType + " does not support reconfiguration.");
|
err.println("Node type " + nodeType +
|
||||||
|
" does not support reconfiguration.");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int getReconfigurableProperties(String nodeType, String address,
|
||||||
|
PrintStream out, PrintStream err) throws IOException {
|
||||||
|
if ("datanode".equals(nodeType)) {
|
||||||
|
ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
|
||||||
|
try {
|
||||||
|
List<String> properties =
|
||||||
|
dnProxy.listReconfigurableProperties();
|
||||||
|
out.println(
|
||||||
|
"Configuration properties that are allowed to be reconfigured:");
|
||||||
|
for (String name : properties) {
|
||||||
|
out.println(name);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
err.println("DataNode reconfiguration: " + e + ".");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err.println("Node type " + nodeType +
|
||||||
|
" does not support reconfiguration.");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -180,6 +180,14 @@ message GetReconfigurationStatusResponseProto {
|
||||||
repeated GetReconfigurationStatusConfigChangeProto changes = 3;
|
repeated GetReconfigurationStatusConfigChangeProto changes = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ListReconfigurablePropertiesRequestProto {
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Query the reconfigurable properties on DataNode. */
|
||||||
|
message ListReconfigurablePropertiesResponseProto {
|
||||||
|
repeated string name = 1;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protocol used from client to the Datanode.
|
* Protocol used from client to the Datanode.
|
||||||
* See the request and response for details of rpc call.
|
* See the request and response for details of rpc call.
|
||||||
|
@ -230,6 +238,10 @@ service ClientDatanodeProtocolService {
|
||||||
rpc startReconfiguration(StartReconfigurationRequestProto)
|
rpc startReconfiguration(StartReconfigurationRequestProto)
|
||||||
returns(StartReconfigurationResponseProto);
|
returns(StartReconfigurationResponseProto);
|
||||||
|
|
||||||
|
rpc listReconfigurableProperties(
|
||||||
|
ListReconfigurablePropertiesRequestProto)
|
||||||
|
returns(ListReconfigurablePropertiesResponseProto);
|
||||||
|
|
||||||
rpc triggerBlockReport(TriggerBlockReportRequestProto)
|
rpc triggerBlockReport(TriggerBlockReportRequestProto)
|
||||||
returns(TriggerBlockReportResponseProto);
|
returns(TriggerBlockReportResponseProto);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.ReconfigurationUtil;
|
import org.apache.hadoop.conf.ReconfigurationUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
@ -42,6 +43,7 @@ import static org.hamcrest.CoreMatchers.allOf;
|
||||||
import static org.hamcrest.CoreMatchers.anyOf;
|
import static org.hamcrest.CoreMatchers.anyOf;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.CoreMatchers.not;
|
import static org.hamcrest.CoreMatchers.not;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
|
@ -50,18 +52,17 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestDFSAdmin {
|
public class TestDFSAdmin {
|
||||||
|
private Configuration conf = null;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private DFSAdmin admin;
|
private DFSAdmin admin;
|
||||||
private DataNode datanode;
|
private DataNode datanode;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
conf = new Configuration();
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
restartCluster();
|
||||||
cluster.waitActive();
|
|
||||||
|
|
||||||
admin = new DFSAdmin();
|
admin = new DFSAdmin();
|
||||||
datanode = cluster.getDataNodes().get(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -72,6 +73,15 @@ public class TestDFSAdmin {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void restartCluster() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
datanode = cluster.getDataNodes().get(0);
|
||||||
|
}
|
||||||
|
|
||||||
private List<String> getReconfigureStatus(String nodeType, String address)
|
private List<String> getReconfigureStatus(String nodeType, String address)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
|
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
|
||||||
|
@ -87,16 +97,26 @@ public class TestDFSAdmin {
|
||||||
return outputs;
|
return outputs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
/**
|
||||||
public void testGetReconfigureStatus()
|
* Test reconfiguration and check the status outputs.
|
||||||
|
* @param expectedSuccuss set true if the reconfiguration task should success.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
private void testGetReconfigurationStatus(boolean expectedSuccuss)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
|
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
|
||||||
datanode.setReconfigurationUtil(ru);
|
datanode.setReconfigurationUtil(ru);
|
||||||
|
|
||||||
List<ReconfigurationUtil.PropertyChange> changes =
|
List<ReconfigurationUtil.PropertyChange> changes =
|
||||||
new ArrayList<ReconfigurationUtil.PropertyChange>();
|
new ArrayList<>();
|
||||||
File newDir = new File(cluster.getDataDirectory(), "data_new");
|
File newDir = new File(cluster.getDataDirectory(), "data_new");
|
||||||
|
if (expectedSuccuss) {
|
||||||
newDir.mkdirs();
|
newDir.mkdirs();
|
||||||
|
} else {
|
||||||
|
// Inject failure.
|
||||||
|
newDir.createNewFile();
|
||||||
|
}
|
||||||
changes.add(new ReconfigurationUtil.PropertyChange(
|
changes.add(new ReconfigurationUtil.PropertyChange(
|
||||||
DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
|
DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
|
||||||
datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
@ -121,31 +141,74 @@ public class TestDFSAdmin {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
assertTrue(count > 0);
|
assertTrue(count > 0);
|
||||||
assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED)
|
if (expectedSuccuss) {
|
||||||
|
assertThat(outputs.size(), is(4));
|
||||||
|
} else {
|
||||||
|
assertThat(outputs.size(), is(6));
|
||||||
|
}
|
||||||
|
|
||||||
List<StorageLocation> locations = DataNode.getStorageLocations(
|
List<StorageLocation> locations = DataNode.getStorageLocations(
|
||||||
datanode.getConf());
|
datanode.getConf());
|
||||||
|
if (expectedSuccuss) {
|
||||||
assertThat(locations.size(), is(1));
|
assertThat(locations.size(), is(1));
|
||||||
assertThat(locations.get(0).getFile(), is(newDir));
|
assertThat(locations.get(0).getFile(), is(newDir));
|
||||||
// Verify the directory is appropriately formatted.
|
// Verify the directory is appropriately formatted.
|
||||||
assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
|
assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
|
||||||
|
} else {
|
||||||
|
assertTrue(locations.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5;
|
int offset = 1;
|
||||||
int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4;
|
if (expectedSuccuss) {
|
||||||
assertThat(outputs.get(successOffset),
|
assertThat(outputs.get(offset),
|
||||||
containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY));
|
containsString("SUCCESS: Changed property " +
|
||||||
assertThat(outputs.get(successOffset + 1),
|
DFS_DATANODE_DATA_DIR_KEY));
|
||||||
|
} else {
|
||||||
|
assertThat(outputs.get(offset),
|
||||||
|
containsString("FAILED: Change property " +
|
||||||
|
DFS_DATANODE_DATA_DIR_KEY));
|
||||||
|
}
|
||||||
|
assertThat(outputs.get(offset + 1),
|
||||||
is(allOf(containsString("From:"), containsString("data1"),
|
is(allOf(containsString("From:"), containsString("data1"),
|
||||||
containsString("data2"))));
|
containsString("data2"))));
|
||||||
assertThat(outputs.get(successOffset + 2),
|
assertThat(outputs.get(offset + 2),
|
||||||
is(not(anyOf(containsString("data1"), containsString("data2")))));
|
is(not(anyOf(containsString("data1"), containsString("data2")))));
|
||||||
assertThat(outputs.get(successOffset + 2),
|
assertThat(outputs.get(offset + 2),
|
||||||
is(allOf(containsString("To"), containsString("data_new"))));
|
is(allOf(containsString("To"), containsString("data_new"))));
|
||||||
assertThat(outputs.get(failedOffset),
|
}
|
||||||
containsString("Change property randomKey"));
|
|
||||||
assertThat(outputs.get(failedOffset + 1),
|
@Test(timeout = 30000)
|
||||||
containsString("From: \"old456\""));
|
public void testGetReconfigurationStatus()
|
||||||
assertThat(outputs.get(failedOffset + 2),
|
throws IOException, InterruptedException {
|
||||||
containsString("To: \"new123\""));
|
testGetReconfigurationStatus(true);
|
||||||
|
restartCluster();
|
||||||
|
testGetReconfigurationStatus(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> getReconfigurationAllowedProperties(
|
||||||
|
String nodeType, String address)
|
||||||
|
throws IOException {
|
||||||
|
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
|
||||||
|
PrintStream out = new PrintStream(bufOut);
|
||||||
|
ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
|
||||||
|
PrintStream err = new PrintStream(bufErr);
|
||||||
|
admin.getReconfigurableProperties(nodeType, address, out, err);
|
||||||
|
Scanner scanner = new Scanner(bufOut.toString());
|
||||||
|
List<String> outputs = Lists.newArrayList();
|
||||||
|
while (scanner.hasNextLine()) {
|
||||||
|
outputs.add(scanner.nextLine());
|
||||||
|
}
|
||||||
|
return outputs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testGetReconfigAllowedProperties() throws IOException {
|
||||||
|
final int port = datanode.getIpcPort();
|
||||||
|
final String address = "localhost:" + port;
|
||||||
|
List<String> outputs =
|
||||||
|
getReconfigurationAllowedProperties("datanode", address);
|
||||||
|
assertEquals(2, outputs.size());
|
||||||
|
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||||
|
outputs.get(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue