From 1861b32eb551a07d748afc6205ea9573f9503eda Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Thu, 25 Sep 2014 13:10:28 -0700 Subject: [PATCH] HDFS-6808. Add command line option to ask DataNode reload configuration. (Lei Xu via Colin McCabe) --- .../hadoop/conf/ReconfigurableBase.java | 140 ++++++++++++++++ .../conf/ReconfigurationTaskStatus.java | 70 ++++++++ .../hadoop/conf/ReconfigurationUtil.java | 5 + .../hadoop/conf/TestReconfiguration.java | 157 +++++++++++++++++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/protocol/ClientDatanodeProtocol.java | 12 ++ ...atanodeProtocolServerSideTranslatorPB.java | 61 ++++++- .../ClientDatanodeProtocolTranslatorPB.java | 55 +++++- .../hadoop/hdfs/server/datanode/DataNode.java | 46 ++++- .../apache/hadoop/hdfs/tools/DFSAdmin.java | 95 ++++++++++- .../main/proto/ClientDatanodeProtocol.proto | 30 ++++ .../hadoop/hdfs/tools/TestDFSAdmin.java | 152 +++++++++++++++++ 12 files changed, 813 insertions(+), 13 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java index b872e772259..7521650e6fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java @@ -18,9 +18,18 @@ package org.apache.hadoop.conf; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.commons.logging.*; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; +import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.Map; /** * Utility base class for implementing the Reconfigurable interface. @@ -34,6 +43,30 @@ public abstract class ReconfigurableBase private static final Log LOG = LogFactory.getLog(ReconfigurableBase.class); + // Use for testing purpose. + private ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil(); + + /** Background thread to reload configuration. */ + private Thread reconfigThread = null; + private volatile boolean shouldRun = true; + private Object reconfigLock = new Object(); + + /** + * The timestamp when the reconfigThread starts. + */ + private long startTime = 0; + + /** + * The timestamp when the reconfigThread finishes. + */ + private long endTime = 0; + + /** + * A map of . If error message is present, + * it contains the messages about the error occurred when applies the particular + * change. Otherwise, it indicates that the change has been successfully applied. + */ + private Map> status = null; /** * Construct a ReconfigurableBase. @@ -50,6 +83,113 @@ public abstract class ReconfigurableBase super((conf == null) ? new Configuration() : conf); } + @VisibleForTesting + public void setReconfigurationUtil(ReconfigurationUtil ru) { + reconfigurationUtil = Preconditions.checkNotNull(ru); + } + + @VisibleForTesting + public Collection getChangedProperties( + Configuration newConf, Configuration oldConf) { + return reconfigurationUtil.parseChangedProperties(newConf, oldConf); + } + + /** + * A background thread to apply configuration changes. + */ + private static class ReconfigurationThread extends Thread { + private ReconfigurableBase parent; + + ReconfigurationThread(ReconfigurableBase base) { + this.parent = base; + } + + // See {@link ReconfigurationServlet#applyChanges} + public void run() { + LOG.info("Starting reconfiguration task."); + Configuration oldConf = this.parent.getConf(); + Configuration newConf = new Configuration(); + Collection changes = + this.parent.getChangedProperties(newConf, oldConf); + Map> results = Maps.newHashMap(); + for (PropertyChange change : changes) { + String errorMessage = null; + if (!this.parent.isPropertyReconfigurable(change.prop)) { + errorMessage = "Property " + change.prop + + " is not reconfigurable"; + LOG.info(errorMessage); + results.put(change, Optional.of(errorMessage)); + continue; + } + LOG.info("Change property: " + change.prop + " from \"" + + ((change.oldVal == null) ? "" : change.oldVal) + + "\" to \"" + ((change.newVal == null) ? "" : change.newVal) + + "\"."); + try { + this.parent.reconfigurePropertyImpl(change.prop, change.newVal); + } catch (ReconfigurationException e) { + errorMessage = e.toString(); + } + results.put(change, Optional.fromNullable(errorMessage)); + } + + synchronized (this.parent.reconfigLock) { + this.parent.endTime = Time.monotonicNow(); + this.parent.status = Collections.unmodifiableMap(results); + this.parent.reconfigThread = null; + } + } + } + + /** + * Start a reconfiguration task to reload configuration in background. + */ + public void startReconfigurationTask() throws IOException { + synchronized (reconfigLock) { + if (!shouldRun) { + String errorMessage = "The server is stopped."; + LOG.warn(errorMessage); + throw new IOException(errorMessage); + } + if (reconfigThread != null) { + String errorMessage = "Another reconfiguration task is running."; + LOG.warn(errorMessage); + throw new IOException(errorMessage); + } + reconfigThread = new ReconfigurationThread(this); + reconfigThread.setDaemon(true); + reconfigThread.setName("Reconfiguration Task"); + reconfigThread.start(); + startTime = Time.monotonicNow(); + } + } + + public ReconfigurationTaskStatus getReconfigurationTaskStatus() { + synchronized (reconfigLock) { + if (reconfigThread != null) { + return new ReconfigurationTaskStatus(startTime, 0, null); + } + return new ReconfigurationTaskStatus(startTime, endTime, status); + } + } + + public void shutdownReconfigurationTask() { + Thread tempThread; + synchronized (reconfigLock) { + shouldRun = false; + if (reconfigThread == null) { + return; + } + tempThread = reconfigThread; + reconfigThread = null; + } + + try { + tempThread.join(); + } catch (InterruptedException e) { + } + } + /** * {@inheritDoc} * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java new file mode 100644 index 00000000000..a3a11cd54de --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.conf; + +import com.google.common.base.Optional; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; + +import java.util.Map; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ReconfigurationTaskStatus { + long startTime; + long endTime; + final Map> status; + + public ReconfigurationTaskStatus(long startTime, long endTime, + Map> status) { + this.startTime = startTime; + this.endTime = endTime; + this.status = status; + } + + /** + * Return true if + * - A reconfiguration task has finished or + * - an active reconfiguration task is running + */ + public boolean hasTask() { + return startTime > 0; + } + + /** + * Return true if the latest reconfiguration task has finished and there is + * no another active task running. + */ + public boolean stopped() { + return endTime > 0; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + public final Map> getStatus() { + return status; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java index ca685f40584..7b107fe5c86 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java @@ -63,4 +63,9 @@ public class ReconfigurationUtil { return changes.values(); } + + public Collection parseChangedProperties( + Configuration newConf, Configuration oldConf) { + return getChangedProperties(newConf, oldConf); + } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java index f4367523cbe..07b26eb2b82 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java @@ -18,13 +18,32 @@ package org.apache.hadoop.conf; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; +import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; import org.junit.Test; import org.junit.Before; -import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import java.io.IOException; import java.util.Collection; import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; public class TestReconfiguration { private Configuration conf1; @@ -105,8 +124,8 @@ public class TestReconfiguration { } @Override - public synchronized void reconfigurePropertyImpl(String property, - String newVal) { + public synchronized void reconfigurePropertyImpl( + String property, String newVal) throws ReconfigurationException { // do nothing } @@ -312,4 +331,136 @@ public class TestReconfiguration { } + private static class AsyncReconfigurableDummy extends ReconfigurableBase { + AsyncReconfigurableDummy(Configuration conf) { + super(conf); + } + + final CountDownLatch latch = new CountDownLatch(1); + + @Override + public Collection getReconfigurableProperties() { + return Arrays.asList(PROP1, PROP2, PROP4); + } + + @Override + public synchronized void reconfigurePropertyImpl(String property, + String newVal) throws ReconfigurationException { + try { + latch.await(); + } catch (InterruptedException e) { + // Ignore + } + } + } + + private static void waitAsyncReconfigureTaskFinish(ReconfigurableBase rb) + throws InterruptedException { + ReconfigurationTaskStatus status = null; + int count = 20; + while (count > 0) { + status = rb.getReconfigurationTaskStatus(); + if (status.stopped()) { + break; + } + count--; + Thread.sleep(500); + } + assert(status.stopped()); + } + + @Test + public void testAsyncReconfigure() + throws ReconfigurationException, IOException, InterruptedException { + AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1)); + + List changes = Lists.newArrayList(); + changes.add(new PropertyChange("name1", "new1", "old1")); + changes.add(new PropertyChange("name2", "new2", "old2")); + changes.add(new PropertyChange("name3", "new3", "old3")); + doReturn(changes).when(dummy).getChangedProperties( + any(Configuration.class), any(Configuration.class)); + + doReturn(true).when(dummy).isPropertyReconfigurable(eq("name1")); + doReturn(false).when(dummy).isPropertyReconfigurable(eq("name2")); + doReturn(true).when(dummy).isPropertyReconfigurable(eq("name3")); + + doNothing().when(dummy) + .reconfigurePropertyImpl(eq("name1"), anyString()); + doNothing().when(dummy) + .reconfigurePropertyImpl(eq("name2"), anyString()); + doThrow(new ReconfigurationException("NAME3", "NEW3", "OLD3")) + .when(dummy).reconfigurePropertyImpl(eq("name3"), anyString()); + + dummy.startReconfigurationTask(); + + waitAsyncReconfigureTaskFinish(dummy); + ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus(); + assertEquals(3, status.getStatus().size()); + for (Map.Entry> result : + status.getStatus().entrySet()) { + PropertyChange change = result.getKey(); + if (change.prop.equals("name1")) { + assertFalse(result.getValue().isPresent()); + } else if (change.prop.equals("name2")) { + assertThat(result.getValue().get(), + containsString("Property name2 is not reconfigurable")); + } else if (change.prop.equals("name3")) { + assertThat(result.getValue().get(), containsString("NAME3")); + } else { + fail("Unknown property: " + change.prop); + } + } + } + + @Test(timeout=30000) + public void testStartReconfigurationFailureDueToExistingRunningTask() + throws InterruptedException, IOException { + AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1)); + List changes = Lists.newArrayList( + new PropertyChange(PROP1, "new1", "old1") + ); + doReturn(changes).when(dummy).getChangedProperties( + any(Configuration.class), any(Configuration.class)); + + ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus(); + assertFalse(status.hasTask()); + + dummy.startReconfigurationTask(); + status = dummy.getReconfigurationTaskStatus(); + assertTrue(status.hasTask()); + assertFalse(status.stopped()); + + // An active reconfiguration task is running. + try { + dummy.startReconfigurationTask(); + fail("Expect to throw IOException."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + "Another reconfiguration task is running", e); + } + status = dummy.getReconfigurationTaskStatus(); + assertTrue(status.hasTask()); + assertFalse(status.stopped()); + + dummy.latch.countDown(); + waitAsyncReconfigureTaskFinish(dummy); + status = dummy.getReconfigurationTaskStatus(); + assertTrue(status.hasTask()); + assertTrue(status.stopped()); + + // The first task has finished. + dummy.startReconfigurationTask(); + waitAsyncReconfigureTaskFinish(dummy); + ReconfigurationTaskStatus status2 = dummy.getReconfigurationTaskStatus(); + assertTrue(status2.getStartTime() >= status.getEndTime()); + + dummy.shutdownReconfigurationTask(); + try { + dummy.startReconfigurationTask(); + fail("Expect to throw IOException"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("The server is stopped", e); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4b6b0db5df3..487a5470799 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -585,6 +585,9 @@ Release 2.6.0 - UNRELEASED HDFS-7118. Improve diagnostics on storage directory rename operations by using NativeIO#renameTo in Storage#rename. (cnauroth) + HDFS-6808. Add command line option to ask DataNode reload configuration. + (Lei Xu via Colin Patrick McCabe) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 475686572f0..3a247350d7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; @@ -144,4 +145,15 @@ public interface ClientDatanodeProtocol { * @return software/config version and uptime of the datanode */ DatanodeLocalInfo getDatanodeInfo() throws IOException; + + /** + * Asynchronously reload configuration on disk and apply changes. + */ + void startReconfiguration() throws IOException; + + /** + * Get the status of the previously issued reconfig task. + * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. + */ + ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index c8fa2fed66a..ed7f0ae160c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import com.google.common.base.Optional; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; @@ -32,6 +35,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlo import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder; @@ -41,11 +47,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Refres import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.VersionInfo; import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; @@ -66,6 +72,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements DeleteBlockPoolResponseProto.newBuilder().build(); private final static ShutdownDatanodeResponseProto SHUTDOWN_DATANODE_RESP = ShutdownDatanodeResponseProto.newBuilder().build(); + private final static StartReconfigurationResponseProto START_RECONFIG_RESP = + StartReconfigurationResponseProto.newBuilder().build(); private final ClientDatanodeProtocol impl; @@ -182,4 +190,51 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements } return res; } + + @Override + public StartReconfigurationResponseProto startReconfiguration( + RpcController unused, StartReconfigurationRequestProto request) + throws ServiceException { + try { + impl.startReconfiguration(); + } catch (IOException e) { + throw new ServiceException(e); + } + return START_RECONFIG_RESP; + } + + @Override + public GetReconfigurationStatusResponseProto getReconfigurationStatus( + RpcController unused, GetReconfigurationStatusRequestProto request) + throws ServiceException { + GetReconfigurationStatusResponseProto.Builder builder = + GetReconfigurationStatusResponseProto.newBuilder(); + try { + ReconfigurationTaskStatus status = impl.getReconfigurationStatus(); + builder.setStartTime(status.getStartTime()); + if (status.stopped()) { + builder.setEndTime(status.getEndTime()); + assert status.getStatus() != null; + for (Map.Entry> result : + status.getStatus().entrySet()) { + GetReconfigurationStatusConfigChangeProto.Builder changeBuilder = + GetReconfigurationStatusConfigChangeProto.newBuilder(); + PropertyChange change = result.getKey(); + changeBuilder.setName(change.prop); + changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : ""); + if (change.newVal != null) { + changeBuilder.setNewValue(change.newVal); + } + if (result.getValue().isPresent()) { + // Get full stack trace. + changeBuilder.setErrorMessage(result.getValue().get()); + } + builder.addChanges(changeBuilder); + } + } + } catch (IOException e) { + throw new ServiceException(e); + } + return builder.build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index ca152b3a83a..00b6ad745ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -21,16 +21,20 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.Map; import javax.net.SocketFactory; +import com.google.common.base.Optional; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -48,8 +52,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdf import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -87,6 +94,10 @@ public class ClientDatanodeProtocolTranslatorPB implements RefreshNamenodesRequestProto.newBuilder().build(); private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO = GetDatanodeInfoRequestProto.newBuilder().build(); + private final static GetReconfigurationStatusRequestProto VOID_GET_RECONFIG_STATUS = + GetReconfigurationStatusRequestProto.newBuilder().build(); + private final static StartReconfigurationRequestProto VOID_START_RECONFIG = + StartReconfigurationRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, @@ -282,4 +293,44 @@ public class ClientDatanodeProtocolTranslatorPB implements } } + @Override + public void startReconfiguration() throws IOException { + try { + rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException { + GetReconfigurationStatusResponseProto response; + Map> statusMap = null; + long startTime; + long endTime = 0; + try { + response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER, + VOID_GET_RECONFIG_STATUS); + startTime = response.getStartTime(); + if (response.hasEndTime()) { + endTime = response.getEndTime(); + } + if (response.getChangesCount() > 0) { + statusMap = Maps.newHashMap(); + for (GetReconfigurationStatusConfigChangeProto change : + response.getChangesList()) { + PropertyChange pc = new PropertyChange( + change.getName(), change.getNewValue(), change.getOldValue()); + String errorMessage = null; + if (change.hasErrorMessage()) { + errorMessage = change.getErrorMessage(); + } + statusMap.put(pc, Optional.fromNullable(errorMessage)); + } + } + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return new ReconfigurationTaskStatus(startTime, endTime, statusMap); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b1ef18673ae..44e471e8aa5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -90,6 +90,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurableBase; import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -336,6 +337,21 @@ public class DataNode extends ReconfigurableBase private SpanReceiverHost spanReceiverHost; + /** + * Creates a dummy DataNode for testing purpose. + */ + @VisibleForTesting + @InterfaceAudience.LimitedPrivate("HDFS") + DataNode(final Configuration conf) { + super(conf); + this.fileDescriptorPassingDisabledReason = null; + this.maxNumberOfBlocksToLog = 0; + this.confVersion = null; + this.usersWithLocalPathAccess = null; + this.connectToDnViaHostname = false; + this.getHdfsBlockLocationsEnabled = false; + } + /** * Create the DataNode given a configuration, an array of dataDirs, * and a namenode proxy @@ -478,7 +494,6 @@ public class DataNode extends ReconfigurableBase */ private synchronized void refreshVolumes(String newVolumes) throws Exception { Configuration conf = getConf(); - String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY); conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); List locations = getStorageLocations(conf); @@ -486,6 +501,7 @@ public class DataNode extends ReconfigurableBase dataDirs = locations; ChangedVolumes changedVolumes = parseChangedVolumes(); + StringBuilder errorMessageBuilder = new StringBuilder(); try { if (numOldDataDirs + changedVolumes.newLocations.size() - changedVolumes.deactivateLocations.size() <= 0) { @@ -514,8 +530,13 @@ public class DataNode extends ReconfigurableBase // Clean all failed volumes. for (StorageLocation location : changedVolumes.newLocations) { if (!succeedVolumes.contains(location)) { + errorMessageBuilder.append("FAILED TO ADD:"); failedVolumes.add(location); + } else { + errorMessageBuilder.append("ADDED:"); } + errorMessageBuilder.append(location); + errorMessageBuilder.append("\n"); } storage.removeVolumes(failedVolumes); data.removeVolumes(failedVolumes); @@ -529,10 +550,12 @@ public class DataNode extends ReconfigurableBase data.removeVolumes(changedVolumes.deactivateLocations); storage.removeVolumes(changedVolumes.deactivateLocations); } + + if (errorMessageBuilder.length() > 0) { + throw new IOException(errorMessageBuilder.toString()); + } } catch (IOException e) { - LOG.warn("There is IOException when refreshing volumes! " - + "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY - + " = " + oldVolumes, e); + LOG.warn("There is IOException when refresh volumes! ", e); throw e; } } @@ -1594,6 +1617,9 @@ public class DataNode extends ReconfigurableBase // before the restart prep is done. this.shouldRun = false; + // wait reconfiguration thread, if any, to exit + shutdownReconfigurationTask(); + // wait for all data receiver threads to exit if (this.threadGroup != null) { int sleepMs = 2; @@ -2847,6 +2873,18 @@ public class DataNode extends ReconfigurableBase confVersion, uptime); } + @Override // ClientDatanodeProtocol + public void startReconfiguration() throws IOException { + checkSuperuserPrivilege(); + startReconfigurationTask(); + } + + @Override // ClientDatanodeProtocol + public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException { + checkSuperuserPrivilege(); + return getReconfigurationTaskStatus(); + } + /** * @param addr rpc address of the namenode * @return true if the datanode is connected to a NameNode at the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 525f6d3e25d..298564e41a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -29,14 +29,19 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TreeSet; +import com.google.common.base.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; @@ -378,6 +383,7 @@ public class DFSAdmin extends FsShell { "\t[-refreshSuperUserGroupsConfiguration]\n" + "\t[-refreshCallQueue]\n" + "\t[-refresh [arg1..argn]\n" + + "\t[-reconfig ]\n" + "\t[-printTopology]\n" + "\t[-refreshNamenodes datanode_host:ipc_port]\n"+ "\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+ @@ -915,9 +921,14 @@ public class DFSAdmin extends FsShell { String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; + String reconfig = "-reconfig :\n" + + "\tStarts reconfiguration or gets the status of an ongoing reconfiguration.\n" + + "\tThe second parameter specifies the node type.\n" + + "\tCurrently, only reloading DataNode's configuration is supported.\n"; + String genericRefresh = "-refresh: Arguments are [arg1..argn]\n" + "\tTriggers a runtime-refresh of the resource specified by \n" + - "\ton . All other args after are sent to the host."; + "\ton . All other args after are sent to the host.\n"; String printTopology = "-printTopology: Print a tree of the racks and their\n" + "\t\tnodes as reported by the Namenode\n"; @@ -1011,6 +1022,8 @@ public class DFSAdmin extends FsShell { System.out.println(refreshCallQueue); } else if ("refresh".equals(cmd)) { System.out.println(genericRefresh); + } else if ("reconfig".equals(cmd)) { + System.out.println(reconfig); } else if ("printTopology".equals(cmd)) { System.out.println(printTopology); } else if ("refreshNamenodes".equals(cmd)) { @@ -1055,6 +1068,7 @@ public class DFSAdmin extends FsShell { System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshCallQueue); System.out.println(genericRefresh); + System.out.println(reconfig); System.out.println(printTopology); System.out.println(refreshNamenodes); System.out.println(deleteBlockPool); @@ -1364,6 +1378,75 @@ public class DFSAdmin extends FsShell { return 0; } + public int reconfig(String[] argv, int i) throws IOException { + String nodeType = argv[i]; + String address = argv[i + 1]; + String op = argv[i + 2]; + if ("start".equals(op)) { + return startReconfiguration(nodeType, address); + } else if ("status".equals(op)) { + return getReconfigurationStatus(nodeType, address, System.out, System.err); + } + System.err.println("Unknown operation: " + op); + return -1; + } + + int startReconfiguration(String nodeType, String address) throws IOException { + if ("datanode".equals(nodeType)) { + ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); + dnProxy.startReconfiguration(); + System.out.println("Started reconfiguration task on DataNode " + address); + } else { + System.err.println("Node type " + nodeType + + " does not support reconfiguration."); + } + return -1; + } + + int getReconfigurationStatus(String nodeType, String address, + PrintStream out, PrintStream err) throws IOException { + if ("datanode".equals(nodeType)) { + ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); + try { + ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus(); + out.print("Reconfiguring status for DataNode[" + address + "]: "); + if (!status.hasTask()) { + out.println("no task was found."); + return 0; + } + out.print("started at " + new Date(status.getStartTime())); + if (!status.stopped()) { + out.println(" and is still running."); + return 0; + } + + out.println(" and finished at " + + new Date(status.getEndTime()).toString() + "."); + for (Map.Entry> result : + status.getStatus().entrySet()) { + if (!result.getValue().isPresent()) { + out.print("SUCCESS: "); + } else { + out.print("FAILED: "); + } + out.printf("Change property %s\n\tFrom: \"%s\"\n\tTo: \"%s\"\n", + result.getKey().prop, result.getKey().oldVal, + result.getKey().newVal); + if (result.getValue().isPresent()) { + out.println("\tError: " + result.getValue().get() + "."); + } + } + } catch (IOException e) { + err.println("DataNode reloading configuration: " + e + "."); + return -1; + } + } else { + err.println("Node type " + nodeType + " does not support reconfiguration."); + return -1; + } + return 0; + } + public int genericRefresh(String[] argv, int i) throws IOException { String hostport = argv[i++]; String identifier = argv[i++]; @@ -1482,6 +1565,9 @@ public class DFSAdmin extends FsShell { } else if ("-refreshCallQueue".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-refreshCallQueue]"); + } else if ("-reconfig".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-reconfig ]"); } else if ("-refresh".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-refresh [arg1..argn]"); @@ -1614,6 +1700,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-reconfig".equals(cmd)) { + if (argv.length != 4) { + printUsage(cmd); + return exitCode; + } } else if ("-deleteBlockPool".equals(cmd)) { if ((argv.length != 3) && (argv.length != 4)) { printUsage(cmd); @@ -1720,6 +1811,8 @@ public class DFSAdmin extends FsShell { exitCode = shutdownDatanode(argv, i); } else if ("-getDatanodeInfo".equals(cmd)) { exitCode = getDatanodeInfo(argv, i); + } else if ("-reconfig".equals(cmd)) { + exitCode = reconfig(argv, i); } else if ("-setStoragePolicy".equals(cmd)) { exitCode = setStoragePolicy(argv); } else if ("-getStoragePolicy".equals(cmd)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto index 8779f7c3580..61f787bec92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto @@ -149,6 +149,30 @@ message GetDatanodeInfoResponseProto { required DatanodeLocalInfoProto localInfo = 1; } +/** Asks DataNode to reload configuration file. */ +message StartReconfigurationRequestProto { +} + +message StartReconfigurationResponseProto { +} + +/** Query the running status of reconfiguration process */ +message GetReconfigurationStatusRequestProto { +} + +message GetReconfigurationStatusConfigChangeProto { + required string name = 1; + required string oldValue = 2; + optional string newValue = 3; + optional string errorMessage = 4; // It is empty if success. +} + +message GetReconfigurationStatusResponseProto { + required int64 startTime = 1; + optional int64 endTime = 2; + repeated GetReconfigurationStatusConfigChangeProto changes = 3; +} + /** * Protocol used from client to the Datanode. * See the request and response for details of rpc call. @@ -192,4 +216,10 @@ service ClientDatanodeProtocolService { rpc getDatanodeInfo(GetDatanodeInfoRequestProto) returns(GetDatanodeInfoResponseProto); + + rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto) + returns(GetReconfigurationStatusResponseProto); + + rpc startReconfiguration(StartReconfigurationRequestProto) + returns(StartReconfigurationResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java new file mode 100644 index 00000000000..35c61f0255f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.tools; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Scanner; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.hamcrest.CoreMatchers.containsString; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestDFSAdmin { + private MiniDFSCluster cluster; + private DFSAdmin admin; + private DataNode datanode; + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + + admin = new DFSAdmin(); + datanode = cluster.getDataNodes().get(0); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private List getReconfigureStatus(String nodeType, String address) + throws IOException { + ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bufOut); + ByteArrayOutputStream bufErr = new ByteArrayOutputStream(); + PrintStream err = new PrintStream(bufErr); + admin.getReconfigurationStatus(nodeType, address, out, err); + Scanner scanner = new Scanner(bufOut.toString()); + List outputs = Lists.newArrayList(); + while (scanner.hasNextLine()) { + outputs.add(scanner.nextLine()); + } + return outputs; + } + + @Test(timeout = 30000) + public void testGetReconfigureStatus() + throws IOException, InterruptedException { + ReconfigurationUtil ru = mock(ReconfigurationUtil.class); + datanode.setReconfigurationUtil(ru); + + List changes = + new ArrayList(); + File newDir = new File(cluster.getDataDirectory(), "data_new"); + newDir.mkdirs(); + changes.add(new ReconfigurationUtil.PropertyChange( + DFS_DATANODE_DATA_DIR_KEY, newDir.toString(), + datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); + changes.add(new ReconfigurationUtil.PropertyChange( + "randomKey", "new123", "old456")); + when(ru.parseChangedProperties(any(Configuration.class), + any(Configuration.class))).thenReturn(changes); + + final int port = datanode.getIpcPort(); + final String address = "localhost:" + port; + + admin.startReconfiguration("datanode", address); + + List outputs = null; + int count = 100; + while (count > 0) { + outputs = getReconfigureStatus("datanode", address); + if (!outputs.isEmpty() && outputs.get(0).contains("finished")) { + break; + } + count--; + Thread.sleep(100); + } + assertTrue(count > 0); + assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED) + + List locations = DataNode.getStorageLocations( + datanode.getConf()); + assertThat(locations.size(), is(1)); + assertThat(locations.get(0).getFile(), is(newDir)); + // Verify the directory is appropriately formatted. + assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory()); + + int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5; + int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4; + assertThat(outputs.get(successOffset), + containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY)); + assertThat(outputs.get(successOffset + 1), + is(allOf(containsString("From:"), containsString("data1"), + containsString("data2")))); + assertThat(outputs.get(successOffset + 2), + is(not(anyOf(containsString("data1"), containsString("data2"))))); + assertThat(outputs.get(successOffset + 2), + is(allOf(containsString("To"), containsString("data_new")))); + assertThat(outputs.get(failedOffset), + containsString("Change property randomKey")); + assertThat(outputs.get(failedOffset + 1), + containsString("From: \"old456\"")); + assertThat(outputs.get(failedOffset + 2), + containsString("To: \"new123\"")); + } +} \ No newline at end of file