diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
index b872e772259..7521650e6fa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
@@ -18,9 +18,18 @@
package org.apache.hadoop.conf;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.*;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
+import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
/**
* Utility base class for implementing the Reconfigurable interface.
@@ -34,6 +43,30 @@ public abstract class ReconfigurableBase
private static final Log LOG =
LogFactory.getLog(ReconfigurableBase.class);
+ // Use for testing purpose.
+ private ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil();
+
+ /** Background thread to reload configuration. */
+ private Thread reconfigThread = null;
+ private volatile boolean shouldRun = true;
+ private Object reconfigLock = new Object();
+
+ /**
+ * The timestamp when the reconfigThread
starts.
+ */
+ private long startTime = 0;
+
+ /**
+ * The timestamp when the reconfigThread
finishes.
+ */
+ private long endTime = 0;
+
+ /**
+ * A map of . If error message is present,
+ * it contains the messages about the error occurred when applies the particular
+ * change. Otherwise, it indicates that the change has been successfully applied.
+ */
+ private Map> status = null;
/**
* Construct a ReconfigurableBase.
@@ -50,6 +83,113 @@ public abstract class ReconfigurableBase
super((conf == null) ? new Configuration() : conf);
}
+ @VisibleForTesting
+ public void setReconfigurationUtil(ReconfigurationUtil ru) {
+ reconfigurationUtil = Preconditions.checkNotNull(ru);
+ }
+
+ @VisibleForTesting
+ public Collection getChangedProperties(
+ Configuration newConf, Configuration oldConf) {
+ return reconfigurationUtil.parseChangedProperties(newConf, oldConf);
+ }
+
+ /**
+ * A background thread to apply configuration changes.
+ */
+ private static class ReconfigurationThread extends Thread {
+ private ReconfigurableBase parent;
+
+ ReconfigurationThread(ReconfigurableBase base) {
+ this.parent = base;
+ }
+
+ // See {@link ReconfigurationServlet#applyChanges}
+ public void run() {
+ LOG.info("Starting reconfiguration task.");
+ Configuration oldConf = this.parent.getConf();
+ Configuration newConf = new Configuration();
+ Collection changes =
+ this.parent.getChangedProperties(newConf, oldConf);
+ Map> results = Maps.newHashMap();
+ for (PropertyChange change : changes) {
+ String errorMessage = null;
+ if (!this.parent.isPropertyReconfigurable(change.prop)) {
+ errorMessage = "Property " + change.prop +
+ " is not reconfigurable";
+ LOG.info(errorMessage);
+ results.put(change, Optional.of(errorMessage));
+ continue;
+ }
+ LOG.info("Change property: " + change.prop + " from \""
+ + ((change.oldVal == null) ? "" : change.oldVal)
+ + "\" to \"" + ((change.newVal == null) ? "" : change.newVal)
+ + "\".");
+ try {
+ this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
+ } catch (ReconfigurationException e) {
+ errorMessage = e.toString();
+ }
+ results.put(change, Optional.fromNullable(errorMessage));
+ }
+
+ synchronized (this.parent.reconfigLock) {
+ this.parent.endTime = Time.monotonicNow();
+ this.parent.status = Collections.unmodifiableMap(results);
+ this.parent.reconfigThread = null;
+ }
+ }
+ }
+
+ /**
+ * Start a reconfiguration task to reload configuration in background.
+ */
+ public void startReconfigurationTask() throws IOException {
+ synchronized (reconfigLock) {
+ if (!shouldRun) {
+ String errorMessage = "The server is stopped.";
+ LOG.warn(errorMessage);
+ throw new IOException(errorMessage);
+ }
+ if (reconfigThread != null) {
+ String errorMessage = "Another reconfiguration task is running.";
+ LOG.warn(errorMessage);
+ throw new IOException(errorMessage);
+ }
+ reconfigThread = new ReconfigurationThread(this);
+ reconfigThread.setDaemon(true);
+ reconfigThread.setName("Reconfiguration Task");
+ reconfigThread.start();
+ startTime = Time.monotonicNow();
+ }
+ }
+
+ public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
+ synchronized (reconfigLock) {
+ if (reconfigThread != null) {
+ return new ReconfigurationTaskStatus(startTime, 0, null);
+ }
+ return new ReconfigurationTaskStatus(startTime, endTime, status);
+ }
+ }
+
+ public void shutdownReconfigurationTask() {
+ Thread tempThread;
+ synchronized (reconfigLock) {
+ shouldRun = false;
+ if (reconfigThread == null) {
+ return;
+ }
+ tempThread = reconfigThread;
+ reconfigThread = null;
+ }
+
+ try {
+ tempThread.join();
+ } catch (InterruptedException e) {
+ }
+ }
+
/**
* {@inheritDoc}
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java
new file mode 100644
index 00000000000..a3a11cd54de
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.conf;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
+
+import java.util.Map;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ReconfigurationTaskStatus {
+ long startTime;
+ long endTime;
+ final Map> status;
+
+ public ReconfigurationTaskStatus(long startTime, long endTime,
+ Map> status) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.status = status;
+ }
+
+ /**
+ * Return true if
+ * - A reconfiguration task has finished or
+ * - an active reconfiguration task is running
+ */
+ public boolean hasTask() {
+ return startTime > 0;
+ }
+
+ /**
+ * Return true if the latest reconfiguration task has finished and there is
+ * no another active task running.
+ */
+ public boolean stopped() {
+ return endTime > 0;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public final Map> getStatus() {
+ return status;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java
index ca685f40584..7b107fe5c86 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java
@@ -63,4 +63,9 @@ public class ReconfigurationUtil {
return changes.values();
}
+
+ public Collection parseChangedProperties(
+ Configuration newConf, Configuration oldConf) {
+ return getChangedProperties(newConf, oldConf);
+ }
}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java
index f4367523cbe..07b26eb2b82 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java
@@ -18,13 +18,32 @@
package org.apache.hadoop.conf;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.junit.Test;
import org.junit.Before;
-import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
import java.util.Collection;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
public class TestReconfiguration {
private Configuration conf1;
@@ -105,8 +124,8 @@ public class TestReconfiguration {
}
@Override
- public synchronized void reconfigurePropertyImpl(String property,
- String newVal) {
+ public synchronized void reconfigurePropertyImpl(
+ String property, String newVal) throws ReconfigurationException {
// do nothing
}
@@ -312,4 +331,136 @@ public class TestReconfiguration {
}
+ private static class AsyncReconfigurableDummy extends ReconfigurableBase {
+ AsyncReconfigurableDummy(Configuration conf) {
+ super(conf);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public Collection getReconfigurableProperties() {
+ return Arrays.asList(PROP1, PROP2, PROP4);
+ }
+
+ @Override
+ public synchronized void reconfigurePropertyImpl(String property,
+ String newVal) throws ReconfigurationException {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+ private static void waitAsyncReconfigureTaskFinish(ReconfigurableBase rb)
+ throws InterruptedException {
+ ReconfigurationTaskStatus status = null;
+ int count = 20;
+ while (count > 0) {
+ status = rb.getReconfigurationTaskStatus();
+ if (status.stopped()) {
+ break;
+ }
+ count--;
+ Thread.sleep(500);
+ }
+ assert(status.stopped());
+ }
+
+ @Test
+ public void testAsyncReconfigure()
+ throws ReconfigurationException, IOException, InterruptedException {
+ AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1));
+
+ List changes = Lists.newArrayList();
+ changes.add(new PropertyChange("name1", "new1", "old1"));
+ changes.add(new PropertyChange("name2", "new2", "old2"));
+ changes.add(new PropertyChange("name3", "new3", "old3"));
+ doReturn(changes).when(dummy).getChangedProperties(
+ any(Configuration.class), any(Configuration.class));
+
+ doReturn(true).when(dummy).isPropertyReconfigurable(eq("name1"));
+ doReturn(false).when(dummy).isPropertyReconfigurable(eq("name2"));
+ doReturn(true).when(dummy).isPropertyReconfigurable(eq("name3"));
+
+ doNothing().when(dummy)
+ .reconfigurePropertyImpl(eq("name1"), anyString());
+ doNothing().when(dummy)
+ .reconfigurePropertyImpl(eq("name2"), anyString());
+ doThrow(new ReconfigurationException("NAME3", "NEW3", "OLD3"))
+ .when(dummy).reconfigurePropertyImpl(eq("name3"), anyString());
+
+ dummy.startReconfigurationTask();
+
+ waitAsyncReconfigureTaskFinish(dummy);
+ ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
+ assertEquals(3, status.getStatus().size());
+ for (Map.Entry> result :
+ status.getStatus().entrySet()) {
+ PropertyChange change = result.getKey();
+ if (change.prop.equals("name1")) {
+ assertFalse(result.getValue().isPresent());
+ } else if (change.prop.equals("name2")) {
+ assertThat(result.getValue().get(),
+ containsString("Property name2 is not reconfigurable"));
+ } else if (change.prop.equals("name3")) {
+ assertThat(result.getValue().get(), containsString("NAME3"));
+ } else {
+ fail("Unknown property: " + change.prop);
+ }
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testStartReconfigurationFailureDueToExistingRunningTask()
+ throws InterruptedException, IOException {
+ AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1));
+ List changes = Lists.newArrayList(
+ new PropertyChange(PROP1, "new1", "old1")
+ );
+ doReturn(changes).when(dummy).getChangedProperties(
+ any(Configuration.class), any(Configuration.class));
+
+ ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
+ assertFalse(status.hasTask());
+
+ dummy.startReconfigurationTask();
+ status = dummy.getReconfigurationTaskStatus();
+ assertTrue(status.hasTask());
+ assertFalse(status.stopped());
+
+ // An active reconfiguration task is running.
+ try {
+ dummy.startReconfigurationTask();
+ fail("Expect to throw IOException.");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Another reconfiguration task is running", e);
+ }
+ status = dummy.getReconfigurationTaskStatus();
+ assertTrue(status.hasTask());
+ assertFalse(status.stopped());
+
+ dummy.latch.countDown();
+ waitAsyncReconfigureTaskFinish(dummy);
+ status = dummy.getReconfigurationTaskStatus();
+ assertTrue(status.hasTask());
+ assertTrue(status.stopped());
+
+ // The first task has finished.
+ dummy.startReconfigurationTask();
+ waitAsyncReconfigureTaskFinish(dummy);
+ ReconfigurationTaskStatus status2 = dummy.getReconfigurationTaskStatus();
+ assertTrue(status2.getStartTime() >= status.getEndTime());
+
+ dummy.shutdownReconfigurationTask();
+ try {
+ dummy.startReconfigurationTask();
+ fail("Expect to throw IOException");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("The server is stopped", e);
+ }
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 4b6b0db5df3..487a5470799 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -585,6 +585,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7118. Improve diagnostics on storage directory rename operations by
using NativeIO#renameTo in Storage#rename. (cnauroth)
+ HDFS-6808. Add command line option to ask DataNode reload configuration.
+ (Lei Xu via Colin Patrick McCabe)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
index 475686572f0..3a247350d7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
@@ -144,4 +145,15 @@ public interface ClientDatanodeProtocol {
* @return software/config version and uptime of the datanode
*/
DatanodeLocalInfo getDatanodeInfo() throws IOException;
+
+ /**
+ * Asynchronously reload configuration on disk and apply changes.
+ */
+ void startReconfiguration() throws IOException;
+
+ /**
+ * Get the status of the previously issued reconfig task.
+ * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
+ */
+ ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index c8fa2fed66a..ed7f0ae160c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import com.google.common.base.Optional;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
@@ -32,6 +35,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlo
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
@@ -41,11 +47,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Refres
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.VersionInfo;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
@@ -66,6 +72,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
DeleteBlockPoolResponseProto.newBuilder().build();
private final static ShutdownDatanodeResponseProto SHUTDOWN_DATANODE_RESP =
ShutdownDatanodeResponseProto.newBuilder().build();
+ private final static StartReconfigurationResponseProto START_RECONFIG_RESP =
+ StartReconfigurationResponseProto.newBuilder().build();
private final ClientDatanodeProtocol impl;
@@ -182,4 +190,51 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
}
return res;
}
+
+ @Override
+ public StartReconfigurationResponseProto startReconfiguration(
+ RpcController unused, StartReconfigurationRequestProto request)
+ throws ServiceException {
+ try {
+ impl.startReconfiguration();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return START_RECONFIG_RESP;
+ }
+
+ @Override
+ public GetReconfigurationStatusResponseProto getReconfigurationStatus(
+ RpcController unused, GetReconfigurationStatusRequestProto request)
+ throws ServiceException {
+ GetReconfigurationStatusResponseProto.Builder builder =
+ GetReconfigurationStatusResponseProto.newBuilder();
+ try {
+ ReconfigurationTaskStatus status = impl.getReconfigurationStatus();
+ builder.setStartTime(status.getStartTime());
+ if (status.stopped()) {
+ builder.setEndTime(status.getEndTime());
+ assert status.getStatus() != null;
+ for (Map.Entry> result :
+ status.getStatus().entrySet()) {
+ GetReconfigurationStatusConfigChangeProto.Builder changeBuilder =
+ GetReconfigurationStatusConfigChangeProto.newBuilder();
+ PropertyChange change = result.getKey();
+ changeBuilder.setName(change.prop);
+ changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : "");
+ if (change.newVal != null) {
+ changeBuilder.setNewValue(change.newVal);
+ }
+ if (result.getValue().isPresent()) {
+ // Get full stack trace.
+ changeBuilder.setErrorMessage(result.getValue().get());
+ }
+ builder.addChanges(changeBuilder);
+ }
+ }
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return builder.build();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index ca152b3a83a..00b6ad745ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -21,16 +21,20 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import javax.net.SocketFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -48,8 +52,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdf
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -87,6 +94,10 @@ public class ClientDatanodeProtocolTranslatorPB implements
RefreshNamenodesRequestProto.newBuilder().build();
private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO =
GetDatanodeInfoRequestProto.newBuilder().build();
+ private final static GetReconfigurationStatusRequestProto VOID_GET_RECONFIG_STATUS =
+ GetReconfigurationStatusRequestProto.newBuilder().build();
+ private final static StartReconfigurationRequestProto VOID_START_RECONFIG =
+ StartReconfigurationRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
@@ -282,4 +293,44 @@ public class ClientDatanodeProtocolTranslatorPB implements
}
}
+ @Override
+ public void startReconfiguration() throws IOException {
+ try {
+ rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+ GetReconfigurationStatusResponseProto response;
+ Map> statusMap = null;
+ long startTime;
+ long endTime = 0;
+ try {
+ response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER,
+ VOID_GET_RECONFIG_STATUS);
+ startTime = response.getStartTime();
+ if (response.hasEndTime()) {
+ endTime = response.getEndTime();
+ }
+ if (response.getChangesCount() > 0) {
+ statusMap = Maps.newHashMap();
+ for (GetReconfigurationStatusConfigChangeProto change :
+ response.getChangesList()) {
+ PropertyChange pc = new PropertyChange(
+ change.getName(), change.getNewValue(), change.getOldValue());
+ String errorMessage = null;
+ if (change.hasErrorMessage()) {
+ errorMessage = change.getErrorMessage();
+ }
+ statusMap.put(pc, Optional.fromNullable(errorMessage));
+ }
+ }
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b1ef18673ae..44e471e8aa5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -336,6 +337,21 @@ public class DataNode extends ReconfigurableBase
private SpanReceiverHost spanReceiverHost;
+ /**
+ * Creates a dummy DataNode for testing purpose.
+ */
+ @VisibleForTesting
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ DataNode(final Configuration conf) {
+ super(conf);
+ this.fileDescriptorPassingDisabledReason = null;
+ this.maxNumberOfBlocksToLog = 0;
+ this.confVersion = null;
+ this.usersWithLocalPathAccess = null;
+ this.connectToDnViaHostname = false;
+ this.getHdfsBlockLocationsEnabled = false;
+ }
+
/**
* Create the DataNode given a configuration, an array of dataDirs,
* and a namenode proxy
@@ -478,7 +494,6 @@ public class DataNode extends ReconfigurableBase
*/
private synchronized void refreshVolumes(String newVolumes) throws Exception {
Configuration conf = getConf();
- String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY);
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
List locations = getStorageLocations(conf);
@@ -486,6 +501,7 @@ public class DataNode extends ReconfigurableBase
dataDirs = locations;
ChangedVolumes changedVolumes = parseChangedVolumes();
+ StringBuilder errorMessageBuilder = new StringBuilder();
try {
if (numOldDataDirs + changedVolumes.newLocations.size() -
changedVolumes.deactivateLocations.size() <= 0) {
@@ -514,8 +530,13 @@ public class DataNode extends ReconfigurableBase
// Clean all failed volumes.
for (StorageLocation location : changedVolumes.newLocations) {
if (!succeedVolumes.contains(location)) {
+ errorMessageBuilder.append("FAILED TO ADD:");
failedVolumes.add(location);
+ } else {
+ errorMessageBuilder.append("ADDED:");
}
+ errorMessageBuilder.append(location);
+ errorMessageBuilder.append("\n");
}
storage.removeVolumes(failedVolumes);
data.removeVolumes(failedVolumes);
@@ -529,10 +550,12 @@ public class DataNode extends ReconfigurableBase
data.removeVolumes(changedVolumes.deactivateLocations);
storage.removeVolumes(changedVolumes.deactivateLocations);
}
+
+ if (errorMessageBuilder.length() > 0) {
+ throw new IOException(errorMessageBuilder.toString());
+ }
} catch (IOException e) {
- LOG.warn("There is IOException when refreshing volumes! "
- + "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY
- + " = " + oldVolumes, e);
+ LOG.warn("There is IOException when refresh volumes! ", e);
throw e;
}
}
@@ -1594,6 +1617,9 @@ public class DataNode extends ReconfigurableBase
// before the restart prep is done.
this.shouldRun = false;
+ // wait reconfiguration thread, if any, to exit
+ shutdownReconfigurationTask();
+
// wait for all data receiver threads to exit
if (this.threadGroup != null) {
int sleepMs = 2;
@@ -2847,6 +2873,18 @@ public class DataNode extends ReconfigurableBase
confVersion, uptime);
}
+ @Override // ClientDatanodeProtocol
+ public void startReconfiguration() throws IOException {
+ checkSuperuserPrivilege();
+ startReconfigurationTask();
+ }
+
+ @Override // ClientDatanodeProtocol
+ public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+ checkSuperuserPrivilege();
+ return getReconfigurationTaskStatus();
+ }
+
/**
* @param addr rpc address of the namenode
* @return true if the datanode is connected to a NameNode at the
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 525f6d3e25d..298564e41a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -29,14 +29,19 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.TreeSet;
+import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
@@ -378,6 +383,7 @@ public class DFSAdmin extends FsShell {
"\t[-refreshSuperUserGroupsConfiguration]\n" +
"\t[-refreshCallQueue]\n" +
"\t[-refresh [arg1..argn]\n" +
+ "\t[-reconfig ]\n" +
"\t[-printTopology]\n" +
"\t[-refreshNamenodes datanode_host:ipc_port]\n"+
"\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
@@ -915,9 +921,14 @@ public class DFSAdmin extends FsShell {
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
+ String reconfig = "-reconfig :\n" +
+ "\tStarts reconfiguration or gets the status of an ongoing reconfiguration.\n" +
+ "\tThe second parameter specifies the node type.\n" +
+ "\tCurrently, only reloading DataNode's configuration is supported.\n";
+
String genericRefresh = "-refresh: Arguments are [arg1..argn]\n" +
"\tTriggers a runtime-refresh of the resource specified by \n" +
- "\ton . All other args after are sent to the host.";
+ "\ton . All other args after are sent to the host.\n";
String printTopology = "-printTopology: Print a tree of the racks and their\n" +
"\t\tnodes as reported by the Namenode\n";
@@ -1011,6 +1022,8 @@ public class DFSAdmin extends FsShell {
System.out.println(refreshCallQueue);
} else if ("refresh".equals(cmd)) {
System.out.println(genericRefresh);
+ } else if ("reconfig".equals(cmd)) {
+ System.out.println(reconfig);
} else if ("printTopology".equals(cmd)) {
System.out.println(printTopology);
} else if ("refreshNamenodes".equals(cmd)) {
@@ -1055,6 +1068,7 @@ public class DFSAdmin extends FsShell {
System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshCallQueue);
System.out.println(genericRefresh);
+ System.out.println(reconfig);
System.out.println(printTopology);
System.out.println(refreshNamenodes);
System.out.println(deleteBlockPool);
@@ -1364,6 +1378,75 @@ public class DFSAdmin extends FsShell {
return 0;
}
+ public int reconfig(String[] argv, int i) throws IOException {
+ String nodeType = argv[i];
+ String address = argv[i + 1];
+ String op = argv[i + 2];
+ if ("start".equals(op)) {
+ return startReconfiguration(nodeType, address);
+ } else if ("status".equals(op)) {
+ return getReconfigurationStatus(nodeType, address, System.out, System.err);
+ }
+ System.err.println("Unknown operation: " + op);
+ return -1;
+ }
+
+ int startReconfiguration(String nodeType, String address) throws IOException {
+ if ("datanode".equals(nodeType)) {
+ ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
+ dnProxy.startReconfiguration();
+ System.out.println("Started reconfiguration task on DataNode " + address);
+ } else {
+ System.err.println("Node type " + nodeType +
+ " does not support reconfiguration.");
+ }
+ return -1;
+ }
+
+ int getReconfigurationStatus(String nodeType, String address,
+ PrintStream out, PrintStream err) throws IOException {
+ if ("datanode".equals(nodeType)) {
+ ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
+ try {
+ ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus();
+ out.print("Reconfiguring status for DataNode[" + address + "]: ");
+ if (!status.hasTask()) {
+ out.println("no task was found.");
+ return 0;
+ }
+ out.print("started at " + new Date(status.getStartTime()));
+ if (!status.stopped()) {
+ out.println(" and is still running.");
+ return 0;
+ }
+
+ out.println(" and finished at " +
+ new Date(status.getEndTime()).toString() + ".");
+ for (Map.Entry> result :
+ status.getStatus().entrySet()) {
+ if (!result.getValue().isPresent()) {
+ out.print("SUCCESS: ");
+ } else {
+ out.print("FAILED: ");
+ }
+ out.printf("Change property %s\n\tFrom: \"%s\"\n\tTo: \"%s\"\n",
+ result.getKey().prop, result.getKey().oldVal,
+ result.getKey().newVal);
+ if (result.getValue().isPresent()) {
+ out.println("\tError: " + result.getValue().get() + ".");
+ }
+ }
+ } catch (IOException e) {
+ err.println("DataNode reloading configuration: " + e + ".");
+ return -1;
+ }
+ } else {
+ err.println("Node type " + nodeType + " does not support reconfiguration.");
+ return -1;
+ }
+ return 0;
+ }
+
public int genericRefresh(String[] argv, int i) throws IOException {
String hostport = argv[i++];
String identifier = argv[i++];
@@ -1482,6 +1565,9 @@ public class DFSAdmin extends FsShell {
} else if ("-refreshCallQueue".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-refreshCallQueue]");
+ } else if ("-reconfig".equals(cmd)) {
+ System.err.println("Usage: java DFSAdmin"
+ + " [-reconfig ]");
} else if ("-refresh".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-refresh [arg1..argn]");
@@ -1614,6 +1700,11 @@ public class DFSAdmin extends FsShell {
printUsage(cmd);
return exitCode;
}
+ } else if ("-reconfig".equals(cmd)) {
+ if (argv.length != 4) {
+ printUsage(cmd);
+ return exitCode;
+ }
} else if ("-deleteBlockPool".equals(cmd)) {
if ((argv.length != 3) && (argv.length != 4)) {
printUsage(cmd);
@@ -1720,6 +1811,8 @@ public class DFSAdmin extends FsShell {
exitCode = shutdownDatanode(argv, i);
} else if ("-getDatanodeInfo".equals(cmd)) {
exitCode = getDatanodeInfo(argv, i);
+ } else if ("-reconfig".equals(cmd)) {
+ exitCode = reconfig(argv, i);
} else if ("-setStoragePolicy".equals(cmd)) {
exitCode = setStoragePolicy(argv);
} else if ("-getStoragePolicy".equals(cmd)) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
index 8779f7c3580..61f787bec92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
@@ -149,6 +149,30 @@ message GetDatanodeInfoResponseProto {
required DatanodeLocalInfoProto localInfo = 1;
}
+/** Asks DataNode to reload configuration file. */
+message StartReconfigurationRequestProto {
+}
+
+message StartReconfigurationResponseProto {
+}
+
+/** Query the running status of reconfiguration process */
+message GetReconfigurationStatusRequestProto {
+}
+
+message GetReconfigurationStatusConfigChangeProto {
+ required string name = 1;
+ required string oldValue = 2;
+ optional string newValue = 3;
+ optional string errorMessage = 4; // It is empty if success.
+}
+
+message GetReconfigurationStatusResponseProto {
+ required int64 startTime = 1;
+ optional int64 endTime = 2;
+ repeated GetReconfigurationStatusConfigChangeProto changes = 3;
+}
+
/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
@@ -192,4 +216,10 @@ service ClientDatanodeProtocolService {
rpc getDatanodeInfo(GetDatanodeInfoRequestProto)
returns(GetDatanodeInfoResponseProto);
+
+ rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
+ returns(GetReconfigurationStatusResponseProto);
+
+ rpc startReconfiguration(StartReconfigurationRequestProto)
+ returns(StartReconfigurationResponseProto);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
new file mode 100644
index 00000000000..35c61f0255f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDFSAdmin {
+ private MiniDFSCluster cluster;
+ private DFSAdmin admin;
+ private DataNode datanode;
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+
+ admin = new DFSAdmin();
+ datanode = cluster.getDataNodes().get(0);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private List getReconfigureStatus(String nodeType, String address)
+ throws IOException {
+ ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bufOut);
+ ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
+ PrintStream err = new PrintStream(bufErr);
+ admin.getReconfigurationStatus(nodeType, address, out, err);
+ Scanner scanner = new Scanner(bufOut.toString());
+ List outputs = Lists.newArrayList();
+ while (scanner.hasNextLine()) {
+ outputs.add(scanner.nextLine());
+ }
+ return outputs;
+ }
+
+ @Test(timeout = 30000)
+ public void testGetReconfigureStatus()
+ throws IOException, InterruptedException {
+ ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
+ datanode.setReconfigurationUtil(ru);
+
+ List changes =
+ new ArrayList();
+ File newDir = new File(cluster.getDataDirectory(), "data_new");
+ newDir.mkdirs();
+ changes.add(new ReconfigurationUtil.PropertyChange(
+ DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
+ datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+ changes.add(new ReconfigurationUtil.PropertyChange(
+ "randomKey", "new123", "old456"));
+ when(ru.parseChangedProperties(any(Configuration.class),
+ any(Configuration.class))).thenReturn(changes);
+
+ final int port = datanode.getIpcPort();
+ final String address = "localhost:" + port;
+
+ admin.startReconfiguration("datanode", address);
+
+ List outputs = null;
+ int count = 100;
+ while (count > 0) {
+ outputs = getReconfigureStatus("datanode", address);
+ if (!outputs.isEmpty() && outputs.get(0).contains("finished")) {
+ break;
+ }
+ count--;
+ Thread.sleep(100);
+ }
+ assertTrue(count > 0);
+ assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED)
+
+ List locations = DataNode.getStorageLocations(
+ datanode.getConf());
+ assertThat(locations.size(), is(1));
+ assertThat(locations.get(0).getFile(), is(newDir));
+ // Verify the directory is appropriately formatted.
+ assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
+
+ int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5;
+ int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4;
+ assertThat(outputs.get(successOffset),
+ containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY));
+ assertThat(outputs.get(successOffset + 1),
+ is(allOf(containsString("From:"), containsString("data1"),
+ containsString("data2"))));
+ assertThat(outputs.get(successOffset + 2),
+ is(not(anyOf(containsString("data1"), containsString("data2")))));
+ assertThat(outputs.get(successOffset + 2),
+ is(allOf(containsString("To"), containsString("data_new"))));
+ assertThat(outputs.get(failedOffset),
+ containsString("Change property randomKey"));
+ assertThat(outputs.get(failedOffset + 1),
+ containsString("From: \"old456\""));
+ assertThat(outputs.get(failedOffset + 2),
+ containsString("To: \"new123\""));
+ }
+}
\ No newline at end of file