Backport HDFS-5138 from trunk: Support HDFS upgrade in HA. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-03-24 21:08:50 +00:00
parent da2d86bd04
commit b964c4424f
46 changed files with 2440 additions and 350 deletions

View File

@ -194,6 +194,8 @@ Release 2.4.0 - UNRELEASED
HDFS-6050. NFS does not handle exceptions correctly in a few places
(brandonli)
HDFS-5138. Support HDFS upgrade in HA. (atm via todd)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -189,5 +189,10 @@
<Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.DFSUtil"/>
<Method name="assertAllResultsEqual" />
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
</Match>
</FindBugsFilter>

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.contrib.bkjournal;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@ -665,6 +667,37 @@ public class BookKeeperJournalManager implements JournalManager {
throw new UnsupportedOperationException();
}
@Override
public void doPreUpgrade() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void doUpgrade(Storage storage) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long getJournalCTime() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void doFinalize() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void doRollback() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
try {

View File

@ -316,7 +316,7 @@ public class TestBookKeeperAsHASharedDir {
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery", ioe);
"storage directory does not exist or is not accessible", ioe);
}
}

View File

@ -42,6 +42,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.security.SecureRandom;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -52,7 +53,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
@ -71,7 +71,6 @@ import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -635,9 +634,23 @@ public class DFSUtil {
return ret;
}
/**
* Get all of the RPC addresses of the individual NNs in a given nameservice.
*
* @param conf Configuration
* @param nsId the nameservice whose NNs addresses we want.
* @param defaultValue default address to return in case key is not found.
* @return A map from nnId -> RPC address of each NN in the nameservice.
*/
public static Map<String, InetSocketAddress> getRpcAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue) {
return getAddressesForNameserviceId(conf, nsId, defaultValue,
DFS_NAMENODE_RPC_ADDRESS_KEY);
}
private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue,
String[] keys) {
String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
@ -1693,4 +1706,32 @@ public class DFSUtil {
}
return ttl*1000;
}
/**
* Assert that all objects in the collection are equal. Returns silently if
* so, throws an AssertionError if any object is not equal. All null values
* are considered equal.
*
* @param objects the collection of objects to check for equality.
*/
public static void assertAllResultsEqual(Collection<?> objects) {
Object[] resultsArray = objects.toArray();
if (resultsArray.length == 0)
return;
for (int i = 0; i < resultsArray.length; i++) {
if (i == 0)
continue;
else {
Object currElement = resultsArray[i];
Object lastElement = resultsArray[i - 1];
if ((currElement == null && currElement != lastElement) ||
(currElement != null && !currElement.equals(lastElement))) {
throw new AssertionError("Not all elements match in results: " +
Arrays.toString(resultsArray));
}
}
}
}
}

View File

@ -26,7 +26,6 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -38,14 +37,14 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -301,4 +300,55 @@ public class HAUtil {
DFSClient dfsClient = dfs.getClient();
return RPC.getServerAddress(dfsClient.getNamenode());
}
/**
* Get an RPC proxy for each NN in an HA nameservice. Used when a given RPC
* call should be made on every NN in an HA nameservice, not just the active.
*
* @param conf configuration
* @param nsId the nameservice to get all of the proxies for.
* @return a list of RPC proxies for each NN in the nameservice.
* @throws IOException in the event of error.
*/
public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice(
Configuration conf, String nsId) throws IOException {
Map<String, InetSocketAddress> nnAddresses =
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>();
for (InetSocketAddress nnAddress : nnAddresses.values()) {
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
proxyInfo = NameNodeProxies.createNonHAProxy(conf,
nnAddress, ClientProtocol.class,
UserGroupInformation.getCurrentUser(), false);
namenodes.add(proxyInfo.getProxy());
}
return namenodes;
}
/**
* Used to ensure that at least one of the given HA NNs is currently in the
* active state..
*
* @param namenodes list of RPC proxies for each NN to check.
* @return true if at least one NN is active, false if all are in the standby state.
* @throws IOException in the event of error.
*/
public static boolean isAtLeastOneActive(List<ClientProtocol> namenodes)
throws IOException {
for (ClientProtocol namenode : namenodes) {
try {
namenode.getFileInfo("/");
return true;
} catch (RemoteException re) {
IOException cause = re.unwrapRemoteException();
if (cause instanceof StandbyException) {
// This is expected to happen for a standby NN.
} else {
throw re;
}
}
}
return false;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -153,5 +154,18 @@ interface AsyncLogger {
*/
public void appendReport(StringBuilder sb);
public ListenableFuture<Void> doPreUpgrade();
public ListenableFuture<Void> doUpgrade(StorageInfo sInfo);
public ListenableFuture<Void> doFinalize();
public ListenableFuture<Boolean> canRollBack(StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion);
public ListenableFuture<Void> doRollback();
public ListenableFuture<Long> getJournalCTime();
public ListenableFuture<Void> discardSegments(long startTxId);
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJourna
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -317,4 +318,71 @@ class AsyncLoggerSet {
}
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger, Void> doPreUpgrade() {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doPreUpgrade();
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doUpgrade(StorageInfo sInfo) {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doUpgrade(sInfo);
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doFinalize() {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doFinalize();
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Boolean> canRollBack(StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) {
Map<AsyncLogger, ListenableFuture<Boolean>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Boolean> future =
logger.canRollBack(storage, prevStorage, targetLayoutVersion);
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doRollback() {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doRollback();
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Long> getJournalCTime() {
Map<AsyncLogger, ListenableFuture<Long>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Long> future = logger.getJournalCTime();
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -575,6 +576,72 @@ public class IPCLoggerChannel implements AsyncLogger {
});
}
@Override
public ListenableFuture<Void> doPreUpgrade() {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doPreUpgrade(journalId);
return null;
}
});
}
@Override
public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doUpgrade(journalId, sInfo);
return null;
}
});
}
@Override
public ListenableFuture<Void> doFinalize() {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doFinalize(journalId);
return null;
}
});
}
@Override
public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
final StorageInfo prevStorage, final int targetLayoutVersion) {
return executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return getProxy().canRollBack(journalId, storage, prevStorage,
targetLayoutVersion);
}
});
}
@Override
public ListenableFuture<Void> doRollback() {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doRollback(journalId);
return null;
}
});
}
@Override
public ListenableFuture<Long> getJournalCTime() {
return executor.submit(new Callable<Long>() {
@Override
public Long call() throws IOException {
return getProxy().getJournalCTime(journalId);
}
});
}
@Override
public String toString() {
return InetAddresses.toAddrString(addr.getAddress()) + ':' +
@ -646,4 +713,5 @@ public class IPCLoggerChannel implements AsyncLogger {
private boolean hasHttpServerEndPoint() {
return httpServerURL != null;
}
}

View File

@ -34,10 +34,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
@ -77,8 +80,14 @@ public class QuorumJournalManager implements JournalManager {
// Since these don't occur during normal operation, we can
// use rather lengthy timeouts, and don't need to make them
// configurable.
private static final int FORMAT_TIMEOUT_MS = 60000;
private static final int HASDATA_TIMEOUT_MS = 60000;
private static final int FORMAT_TIMEOUT_MS = 60000;
private static final int HASDATA_TIMEOUT_MS = 60000;
private static final int CAN_ROLL_BACK_TIMEOUT_MS = 60000;
private static final int FINALIZE_TIMEOUT_MS = 60000;
private static final int PRE_UPGRADE_TIMEOUT_MS = 60000;
private static final int ROLL_BACK_TIMEOUT_MS = 60000;
private static final int UPGRADE_TIMEOUT_MS = 60000;
private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
private static final int DISCARD_SEGMENTS_TIMEOUT_MS = 60000;
private final Configuration conf;
@ -495,6 +504,134 @@ public class QuorumJournalManager implements JournalManager {
return loggers;
}
@Override
public void doPreUpgrade() throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
try {
call.waitFor(loggers.size(), loggers.size(), 0, PRE_UPGRADE_TIMEOUT_MS,
"doPreUpgrade");
if (call.countExceptions() > 0) {
call.rethrowException("Could not do pre-upgrade of one or more JournalNodes");
}
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for doPreUpgrade() response");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for doPreUpgrade() response");
}
}
@Override
public void doUpgrade(Storage storage) throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
try {
call.waitFor(loggers.size(), loggers.size(), 0, UPGRADE_TIMEOUT_MS,
"doUpgrade");
if (call.countExceptions() > 0) {
call.rethrowException("Could not perform upgrade of one or more JournalNodes");
}
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for doUpgrade() response");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for doUpgrade() response");
}
}
@Override
public void doFinalize() throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
try {
call.waitFor(loggers.size(), loggers.size(), 0, FINALIZE_TIMEOUT_MS,
"doFinalize");
if (call.countExceptions() > 0) {
call.rethrowException("Could not finalize one or more JournalNodes");
}
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for doFinalize() response");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for doFinalize() response");
}
}
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
prevStorage, targetLayoutVersion);
try {
call.waitFor(loggers.size(), loggers.size(), 0, CAN_ROLL_BACK_TIMEOUT_MS,
"lockSharedStorage");
if (call.countExceptions() > 0) {
call.rethrowException("Could not check if roll back possible for"
+ " one or more JournalNodes");
}
// Either they all return the same thing or this call fails, so we can
// just return the first result.
DFSUtil.assertAllResultsEqual(call.getResults().values());
for (Boolean result : call.getResults().values()) {
return result;
}
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for lockSharedStorage() " +
"response");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for lockSharedStorage() " +
"response");
}
throw new AssertionError("Unreachable code.");
}
@Override
public void doRollback() throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
try {
call.waitFor(loggers.size(), loggers.size(), 0, ROLL_BACK_TIMEOUT_MS,
"doRollback");
if (call.countExceptions() > 0) {
call.rethrowException("Could not perform rollback of one or more JournalNodes");
}
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for doFinalize() response");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for doFinalize() response");
}
}
@Override
public long getJournalCTime() throws IOException {
QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
try {
call.waitFor(loggers.size(), loggers.size(), 0,
GET_JOURNAL_CTIME_TIMEOUT_MS, "getJournalCTime");
if (call.countExceptions() > 0) {
call.rethrowException("Could not journal CTime for one "
+ "more JournalNodes");
}
// Either they all return the same thing or this call fails, so we can
// just return the first result.
DFSUtil.assertAllResultsEqual(call.getResults().values());
for (Long result : call.getResults().values()) {
return result;
}
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for getJournalCTime() " +
"response");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for getJournalCTime() " +
"response");
}
throw new AssertionError("Unreachable code.");
}
@Override
public void discardSegments(long startTxId) throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId);

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.retry.Idempotent;
@ -146,6 +147,19 @@ public interface QJournalProtocol {
public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
public void doPreUpgrade(String journalId) throws IOException;
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException;
public void doFinalize(String journalId) throws IOException;
public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException;
public void doRollback(String journalId) throws IOException;
public Long getJournalCTime(String journalId) throws IOException;
/**
* Discard journal segments whose first TxId is greater than or equal to the
* given txid.

View File

@ -28,14 +28,26 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DiscardSegmentsRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DiscardSegmentsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@ -54,6 +66,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogs
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
@ -263,4 +277,79 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
throw new ServiceException(e);
}
}
@Override
public DoPreUpgradeResponseProto doPreUpgrade(RpcController controller,
DoPreUpgradeRequestProto request) throws ServiceException {
try {
impl.doPreUpgrade(convert(request.getJid()));
return DoPreUpgradeResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public DoUpgradeResponseProto doUpgrade(RpcController controller,
DoUpgradeRequestProto request) throws ServiceException {
StorageInfo si = PBHelper.convert(request.getSInfo(), NodeType.JOURNAL_NODE);
try {
impl.doUpgrade(convert(request.getJid()), si);
return DoUpgradeResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public DoFinalizeResponseProto doFinalize(RpcController controller,
DoFinalizeRequestProto request) throws ServiceException {
try {
impl.doFinalize(convert(request.getJid()));
return DoFinalizeResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public CanRollBackResponseProto canRollBack(RpcController controller,
CanRollBackRequestProto request) throws ServiceException {
try {
StorageInfo si = PBHelper.convert(request.getStorage(), NodeType.JOURNAL_NODE);
Boolean result = impl.canRollBack(convert(request.getJid()), si,
PBHelper.convert(request.getPrevStorage(), NodeType.JOURNAL_NODE),
request.getTargetLayoutVersion());
return CanRollBackResponseProto.newBuilder()
.setCanRollBack(result)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request)
throws ServiceException {
try {
impl.doRollback(convert(request.getJid()));
return DoRollbackResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetJournalCTimeResponseProto getJournalCTime(RpcController controller,
GetJournalCTimeRequestProto request) throws ServiceException {
try {
Long resultCTime = impl.getJournalCTime(convert(request.getJid()));
return GetJournalCTimeResponseProto.newBuilder()
.setResultCTime(resultCTime)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -28,11 +28,19 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DiscardSegmentsRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@ -49,6 +57,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.RequestIn
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.ProtobufHelper;
@ -279,6 +288,87 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
}
@Override
public void doPreUpgrade(String jid) throws IOException {
try {
rpcProxy.doPreUpgrade(NULL_CONTROLLER,
DoPreUpgradeRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
try {
rpcProxy.doUpgrade(NULL_CONTROLLER,
DoUpgradeRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.setSInfo(PBHelper.convert(sInfo))
.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void doFinalize(String jid) throws IOException {
try {
rpcProxy.doFinalize(NULL_CONTROLLER,
DoFinalizeRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
try {
CanRollBackResponseProto response = rpcProxy.canRollBack(
NULL_CONTROLLER,
CanRollBackRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.setStorage(PBHelper.convert(storage))
.setPrevStorage(PBHelper.convert(prevStorage))
.setTargetLayoutVersion(targetLayoutVersion)
.build());
return response.getCanRollBack();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void doRollback(String journalId) throws IOException {
try {
rpcProxy.doRollback(NULL_CONTROLLER,
DoRollbackRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Long getJournalCTime(String journalId) throws IOException {
try {
GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
NULL_CONTROLLER,
GetJournalCTimeRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.build());
return response.getResultCTime();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void discardSegments(String journalId, long startTxId)
throws IOException {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
@ -139,20 +140,26 @@ public class GetJournalEditServlet extends HttpServlet {
private boolean checkStorageInfoOrSendError(JNStorage storage,
HttpServletRequest request, HttpServletResponse response)
throws IOException {
String myStorageInfoString = storage.toColonSeparatedString();
int myNsId = storage.getNamespaceID();
String myClusterId = storage.getClusterID();
String theirStorageInfoString = StringEscapeUtils.escapeHtml(
request.getParameter(STORAGEINFO_PARAM));
if (theirStorageInfoString != null
&& !myStorageInfoString.equals(theirStorageInfoString)) {
String msg = "This node has storage info '" + myStorageInfoString
+ "' but the requesting node expected '"
+ theirStorageInfoString + "'";
response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
LOG.warn("Received an invalid request file transfer request from " +
request.getRemoteAddr() + ": " + msg);
return false;
if (theirStorageInfoString != null) {
int theirNsId = StorageInfo.getNsIdFromColonSeparatedString(
theirStorageInfoString);
String theirClusterId = StorageInfo.getClusterIdFromColonSeparatedString(
theirStorageInfoString);
if (myNsId != theirNsId || !myClusterId.equals(theirClusterId)) {
String msg = "This node has namespaceId '" + myNsId + " and clusterId '"
+ myClusterId + "' but the requesting node expected '" + theirNsId
+ "' and '" + theirClusterId + "'";
response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
LOG.warn("Received an invalid request file transfer request from " +
request.getRemoteAddr() + ": " + msg);
return false;
}
}
return true;
}

View File

@ -130,6 +130,10 @@ class JNStorage extends Storage {
return new File(sd.getCurrentDir(), "paxos");
}
File getRoot() {
return sd.getRoot();
}
/**
* Remove any log files and associated paxos files which are older than
* the given txid.
@ -182,12 +186,15 @@ class JNStorage extends Storage {
unlockAll();
sd.clearDirectory();
writeProperties(sd);
if (!getPaxosDir().mkdirs()) {
throw new IOException("Could not create paxos dir: " + getPaxosDir());
}
createPaxosDir();
analyzeStorage();
}
void createPaxosDir() throws IOException {
if (!getPaxosDir().mkdirs()) {
throw new IOException("Could not create paxos dir: " + getPaxosDir());
}
}
void analyzeStorage() throws IOException {
this.state = sd.analyzeStorage(StartupOption.REGULAR, this);

View File

@ -37,12 +37,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
@ -73,7 +75,7 @@ import com.google.protobuf.TextFormat;
* Each such journal is entirely independent despite being hosted by
* the same JVM.
*/
class Journal implements Closeable {
public class Journal implements Closeable {
static final Log LOG = LogFactory.getLog(Journal.class);
@ -122,8 +124,8 @@ class Journal implements Closeable {
*/
private BestEffortLongFile committedTxnId;
private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
public static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
public static final String LAST_WRITER_EPOCH = "last-writer-epoch";
private static final String COMMITTED_TXID_FILENAME = "committed-txid";
private final FileJournalManager fjm;
@ -627,7 +629,7 @@ class Journal implements Closeable {
}
/**
* @see QJournalProtocol#getEditLogManifest(String, long)
* @see QJournalProtocol#getEditLogManifest(String, long, boolean)
*/
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
boolean inProgressOk) throws IOException {
@ -729,7 +731,7 @@ class Journal implements Closeable {
}
/**
* @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
* @see QJournalProtocol#acceptRecovery(RequestInfo, QJournalProtocolProtos.SegmentStateProto, URL)
*/
public synchronized void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto segment, URL fromUrl)
@ -987,4 +989,62 @@ class Journal implements Closeable {
// we delete all the segments after the startTxId. let's reset committedTxnId
committedTxnId.set(startTxId - 1);
}
public synchronized void doPreUpgrade() throws IOException {
storage.getJournalManager().doPreUpgrade();
}
public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
long oldCTime = storage.getCTime();
storage.cTime = sInfo.cTime;
int oldLV = storage.getLayoutVersion();
storage.layoutVersion = sInfo.layoutVersion;
LOG.info("Starting upgrade of edits directory: "
+ ".\n old LV = " + oldLV
+ "; old CTime = " + oldCTime
+ ".\n new LV = " + storage.getLayoutVersion()
+ "; new CTime = " + storage.getCTime());
storage.getJournalManager().doUpgrade(storage);
storage.createPaxosDir();
// Copy over the contents of the epoch data files to the new dir.
File currentDir = storage.getSingularStorageDir().getCurrentDir();
File previousDir = storage.getSingularStorageDir().getPreviousDir();
PersistentLongFile prevLastPromisedEpoch = new PersistentLongFile(
new File(previousDir, LAST_PROMISED_FILENAME), 0);
PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
new File(previousDir, LAST_WRITER_EPOCH), 0);
lastPromisedEpoch = new PersistentLongFile(
new File(currentDir, LAST_PROMISED_FILENAME), 0);
lastWriterEpoch = new PersistentLongFile(
new File(currentDir, LAST_WRITER_EPOCH), 0);
lastPromisedEpoch.set(prevLastPromisedEpoch.get());
lastWriterEpoch.set(prevLastWriterEpoch.get());
}
public synchronized void doFinalize() throws IOException {
LOG.info("Finalizing upgrade for journal "
+ storage.getRoot() + "."
+ (storage.getLayoutVersion()==0 ? "" :
"\n cur LV = " + storage.getLayoutVersion()
+ "; cur CTime = " + storage.getCTime()));
storage.getJournalManager().doFinalize();
}
public Boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
return this.storage.getJournalManager().canRollBack(storage, prevStorage,
targetLayoutVersion);
}
public void doRollback() throws IOException {
storage.getJournalManager().doRollback();
}
public Long getJournalCTime() throws IOException {
return storage.getJournalManager().getJournalCTime();
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
@ -290,4 +291,31 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
throws IOException {
getOrCreateJournal(journalId).discardSegments(startTxId);
}
public void doPreUpgrade(String journalId) throws IOException {
getOrCreateJournal(journalId).doPreUpgrade();
}
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
getOrCreateJournal(journalId).doUpgrade(sInfo);
}
public void doFinalize(String journalId) throws IOException {
getOrCreateJournal(journalId).doFinalize();
}
public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
return getOrCreateJournal(journalId).canRollBack(storage, prevStorage,
targetLayoutVersion);
}
public void doRollback(String journalId) throws IOException {
getOrCreateJournal(journalId).doRollback();
}
public Long getJournalCTime(String journalId) throws IOException {
return getOrCreateJournal(journalId).getJournalCTime();
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentSt
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -205,6 +206,38 @@ class JournalNodeRpcServer implements QJournalProtocol {
.acceptRecovery(reqInfo, log, fromUrl);
}
@Override
public void doPreUpgrade(String journalId) throws IOException {
jn.doPreUpgrade(journalId);
}
@Override
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
jn.doUpgrade(journalId, sInfo);
}
@Override
public void doFinalize(String journalId) throws IOException {
jn.doFinalize(journalId);
}
@Override
public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion)
throws IOException {
return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion);
}
@Override
public void doRollback(String journalId) throws IOException {
jn.doRollback(journalId);
}
@Override
public Long getJournalCTime(String journalId) throws IOException {
return jn.getJournalCTime(journalId);
}
@Override
public void discardSegments(String journalId, long startTxId)
throws IOException {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -78,7 +77,6 @@ public abstract class Storage extends StorageInfo {
public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
public static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
public static final String STORAGE_DIR_PREVIOUS = "previous";
public static final String STORAGE_TMP_REMOVED = "removed.tmp";
@ -121,22 +119,24 @@ public abstract class Storage extends StorageInfo {
private class DirIterator implements Iterator<StorageDirectory> {
StorageDirType dirType;
boolean includeShared;
int prevIndex; // for remove()
int nextIndex; // for next()
DirIterator(StorageDirType dirType) {
DirIterator(StorageDirType dirType, boolean includeShared) {
this.dirType = dirType;
this.nextIndex = 0;
this.prevIndex = 0;
this.includeShared = includeShared;
}
@Override
public boolean hasNext() {
if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
return false;
if (dirType != null) {
if (dirType != null || !includeShared) {
while (nextIndex < storageDirs.size()) {
if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
if (shouldReturnNextDir())
break;
nextIndex++;
}
@ -151,9 +151,9 @@ public abstract class Storage extends StorageInfo {
StorageDirectory sd = getStorageDir(nextIndex);
prevIndex = nextIndex;
nextIndex++;
if (dirType != null) {
if (dirType != null || !includeShared) {
while (nextIndex < storageDirs.size()) {
if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
if (shouldReturnNextDir())
break;
nextIndex++;
}
@ -167,6 +167,12 @@ public abstract class Storage extends StorageInfo {
storageDirs.remove(prevIndex); // remove last returned element
hasNext(); // reset nextIndex to correct place
}
private boolean shouldReturnNextDir() {
StorageDirectory sd = getStorageDir(nextIndex);
return (dirType == null || sd.getStorageDirType().isOfType(dirType)) &&
(includeShared || !sd.isShared());
}
}
/**
@ -198,7 +204,27 @@ public abstract class Storage extends StorageInfo {
* them via the Iterator
*/
public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
return new DirIterator(dirType);
return dirIterator(dirType, true);
}
/**
* Return all entries in storageDirs, potentially excluding shared dirs.
* @param includeShared whether or not to include shared dirs.
* @return an iterator over the configured storage dirs.
*/
public Iterator<StorageDirectory> dirIterator(boolean includeShared) {
return dirIterator(null, includeShared);
}
/**
* @param dirType all entries will be of this type of dir
* @param includeShared true to include any shared directories,
* false otherwise
* @return an iterator over the configured storage dirs.
*/
public Iterator<StorageDirectory> dirIterator(StorageDirType dirType,
boolean includeShared) {
return new DirIterator(dirType, includeShared);
}
public Iterable<StorageDirectory> dirIterable(final StorageDirType dirType) {
@ -228,7 +254,9 @@ public abstract class Storage extends StorageInfo {
@InterfaceAudience.Private
public static class StorageDirectory implements FormatConfirmable {
final File root; // root directory
final boolean useLock; // flag to enable storage lock
// whether or not this dir is shared between two separate NNs for HA, or
// between multiple block pools in the case of federation.
final boolean isShared;
final StorageDirType dirType; // storage dir type
FileLock lock; // storage lock
@ -236,11 +264,11 @@ public abstract class Storage extends StorageInfo {
public StorageDirectory(File dir) {
// default dirType is null
this(dir, null, true);
this(dir, null, false);
}
public StorageDirectory(File dir, StorageDirType dirType) {
this(dir, dirType, true);
this(dir, dirType, false);
}
public void setStorageUuid(String storageUuid) {
@ -255,14 +283,14 @@ public abstract class Storage extends StorageInfo {
* Constructor
* @param dir directory corresponding to the storage
* @param dirType storage directory type
* @param useLock true - enables locking on the storage directory and false
* disables locking
* @param isShared whether or not this dir is shared between two NNs. true
* disables locking on the storage directory, false enables locking
*/
public StorageDirectory(File dir, StorageDirType dirType, boolean useLock) {
public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
this.root = dir;
this.lock = null;
this.dirType = dirType;
this.useLock = useLock;
this.isShared = isShared;
}
/**
@ -617,6 +645,10 @@ public abstract class Storage extends StorageInfo {
return true;
}
public boolean isShared() {
return isShared;
}
/**
* Lock storage to provide exclusive access.
@ -630,7 +662,7 @@ public abstract class Storage extends StorageInfo {
* @throws IOException if locking fails
*/
public void lock() throws IOException {
if (!useLock) {
if (isShared()) {
LOG.info("Locking is disabled");
return;
}
@ -900,23 +932,6 @@ public abstract class Storage extends StorageInfo {
props.setProperty("cTime", String.valueOf(cTime));
}
/**
* Read properties from the VERSION file in the given storage directory.
*/
public void readProperties(StorageDirectory sd) throws IOException {
Properties props = readPropertiesFile(sd.getVersionFile());
setFieldsFromProperties(props, sd);
}
/**
* Read properties from the the previous/VERSION file in the given storage directory.
*/
public void readPreviousVersionProperties(StorageDirectory sd)
throws IOException {
Properties props = readPropertiesFile(sd.getPreviousVersionFile());
setFieldsFromProperties(props, sd);
}
/**
* Write properties to the VERSION file in the given storage directory.
*/
@ -927,6 +942,11 @@ public abstract class Storage extends StorageInfo {
public void writeProperties(File to, StorageDirectory sd) throws IOException {
Properties props = new Properties();
setPropertiesFromFields(props, sd);
writeProperties(to, sd, props);
}
public static void writeProperties(File to, StorageDirectory sd,
Properties props) throws IOException {
RandomAccessFile file = new RandomAccessFile(to, "rws");
FileOutputStream out = null;
try {
@ -954,23 +974,6 @@ public abstract class Storage extends StorageInfo {
}
}
public static Properties readPropertiesFile(File from) throws IOException {
RandomAccessFile file = new RandomAccessFile(from, "rws");
FileInputStream in = null;
Properties props = new Properties();
try {
in = new FileInputStream(file.getFD());
file.seek(0);
props.load(in);
} finally {
if (in != null) {
in.close();
}
file.close();
}
return props;
}
public static void rename(File from, File to) throws IOException {
if (!from.renameTo(to))
throw new IOException("Failed to rename "

View File

@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.Properties;
import java.util.SortedSet;
@ -208,4 +211,46 @@ public class StorageInfo {
}
return property;
}
public static int getNsIdFromColonSeparatedString(String in) {
return Integer.parseInt(in.split(":")[1]);
}
public static String getClusterIdFromColonSeparatedString(String in) {
return in.split(":")[3];
}
/**
* Read properties from the VERSION file in the given storage directory.
*/
public void readProperties(StorageDirectory sd) throws IOException {
Properties props = readPropertiesFile(sd.getVersionFile());
setFieldsFromProperties(props, sd);
}
/**
* Read properties from the the previous/VERSION file in the given storage directory.
*/
public void readPreviousVersionProperties(StorageDirectory sd)
throws IOException {
Properties props = readPropertiesFile(sd.getPreviousVersionFile());
setFieldsFromProperties(props, sd);
}
public static Properties readPropertiesFile(File from) throws IOException {
RandomAccessFile file = new RandomAccessFile(from, "rws");
FileInputStream in = null;
Properties props = new Properties();
try {
in = new FileInputStream(file.getFD());
file.seek(0);
props.load(in);
} finally {
if (in != null) {
in.close();
}
file.close();
}
return props;
}
}

View File

@ -111,7 +111,7 @@ public class BlockPoolSliceStorage extends Storage {
dataDirs.size());
for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, this);

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -103,4 +105,35 @@ class BackupJournalManager implements JournalManager {
public void discardSegments(long startTxId) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void doPreUpgrade() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void doUpgrade(Storage storage) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void doFinalize() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void doRollback() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long getJournalCTime() throws IOException {
throw new UnsupportedOperationException();
}
}

View File

@ -415,7 +415,8 @@ public class BackupNode extends NameNode {
return DFSUtil.getBackupNameServiceId(conf);
}
protected HAState createHAState() {
@Override
protected HAState createHAState(StartupOption startOpt) {
return new BackupState();
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
@ -83,7 +85,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -256,10 +257,12 @@ public class FSEditLog implements LogsPurgeable {
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(conf, sd, storage), required);
journalSet.add(new FileJournalManager(conf, sd, storage),
required, sharedEditsDirs.contains(u));
}
} else {
journalSet.add(createJournal(u), required);
journalSet.add(createJournal(u), required,
sharedEditsDirs.contains(u));
}
}
@ -1331,6 +1334,58 @@ public class FSEditLog implements LogsPurgeable {
}
}
public long getSharedLogCTime() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
return jas.getManager().getJournalCTime();
}
}
throw new IOException("No shared log found.");
}
public synchronized void doPreUpgradeOfSharedLog() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doPreUpgrade();
}
}
}
public synchronized void doUpgradeOfSharedLog() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doUpgrade(storage);
}
}
}
public synchronized void doFinalizeOfSharedLog() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doFinalize();
}
}
}
public synchronized boolean canRollBackSharedLog(Storage prevStorage,
int targetLayoutVersion) throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
return jas.getManager().canRollBack(storage, prevStorage,
targetLayoutVersion);
}
}
throw new IOException("No shared log found.");
}
public synchronized void doRollback() throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
if (jas.isShared()) {
jas.getManager().doRollback();
}
}
}
public synchronized void discardSegments(long markerTxid)
throws IOException {
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
@ -1469,4 +1524,5 @@ public class FSEditLog implements LogsPurgeable {
+ uri, e);
}
}
}

View File

@ -181,7 +181,8 @@ public class FSImage implements Closeable {
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
MetaRecoveryContext recovery) throws IOException {
MetaRecoveryContext recovery)
throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
@ -260,8 +261,8 @@ public class FSImage implements Closeable {
doImportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
doRollback();
break;
throw new AssertionError("Rollback is now a standalone command, "
+ "NameNode should not be starting with this option.");
case REGULAR:
default:
// just load the image
@ -280,17 +281,15 @@ public class FSImage implements Closeable {
private boolean recoverStorageDirs(StartupOption startOpt,
Map<StorageDirectory, StorageState> dataDirStates) throws IOException {
boolean isFormatted = false;
// This loop needs to be over all storage dirs, even shared dirs, to make
// sure that we properly examine their state, but we make sure we don't
// mutate the shared dir below in the actual loop.
for (Iterator<StorageDirectory> it =
storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, storage);
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) {
throw new IOException("Cannot start an HA namenode with name dirs " +
"that need recovery. Dir: " + sd + " state: " + curState);
}
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
@ -327,7 +326,7 @@ public class FSImage implements Closeable {
void checkUpgrade(FSNamesystem target) throws IOException {
// Upgrade or rolling upgrade is allowed only if there are
// no previous fs states in any of the directories
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
if (sd.getPreviousDir().exists())
throw new InconsistentFSStateException(sd.getRoot(),
@ -356,7 +355,7 @@ public class FSImage implements Closeable {
checkUpgrade(target);
// load the latest image
this.loadFSImage(target, null, null);
this.loadFSImage(target, StartupOption.UPGRADE, null);
// Do upgrade for each directory
target.checkRollingUpgrade("upgrade namenode");
@ -368,28 +367,17 @@ public class FSImage implements Closeable {
List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
assert !editLog.isSegmentOpen() : "Edits log must not be open.";
LOG.info("Starting upgrade of local storage directories."
+ "\n old LV = " + oldLV
+ "; old CTime = " + oldCTime
+ ".\n new LV = " + storage.getLayoutVersion()
+ "; new CTime = " + storage.getCTime());
// Do upgrade for each directory
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
LOG.info("Starting upgrade of image directory " + sd.getRoot()
+ ".\n old LV = " + oldLV
+ "; old CTime = " + oldCTime
+ ".\n new LV = " + storage.getLayoutVersion()
+ "; new CTime = " + storage.getCTime());
try {
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
assert curDir.exists() : "Current directory must exist.";
assert !prevDir.exists() : "previous directory must not exist.";
assert !tmpDir.exists() : "previous.tmp directory must not exist.";
assert !editLog.isSegmentOpen() : "Edits log must not be open.";
// rename current to tmp
NNStorage.rename(curDir, tmpDir);
if (!curDir.mkdir()) {
throw new IOException("Cannot create directory " + curDir);
}
NNUpgradeUtil.doPreUpgrade(sd);
} catch (Exception e) {
LOG.error("Failed to move aside pre-upgrade storage " +
"in image directory " + sd.getRoot(), e);
@ -397,41 +385,38 @@ public class FSImage implements Closeable {
continue;
}
}
if (target.isHaEnabled()) {
editLog.doPreUpgradeOfSharedLog();
}
storage.reportErrorsOnDirectories(errorSDs);
errorSDs.clear();
saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
try {
// Write the version file, since saveFsImage above only makes the
// fsimage_<txid>, and the directory is otherwise empty.
storage.writeProperties(sd);
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
// rename tmp to previous
NNStorage.rename(tmpDir, prevDir);
NNUpgradeUtil.doUpgrade(sd, storage);
} catch (IOException ioe) {
LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
errorSDs.add(sd);
continue;
}
LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
}
if (target.isHaEnabled()) {
editLog.doUpgradeOfSharedLog();
}
storage.reportErrorsOnDirectories(errorSDs);
isUpgradeFinalized = false;
if (!storage.getRemovedStorageDirs().isEmpty()) {
//during upgrade, it's a fatal error to fail any storage directory
// during upgrade, it's a fatal error to fail any storage directory
throw new IOException("Upgrade failed in "
+ storage.getRemovedStorageDirs().size()
+ " storage directory(ies), previously logged.");
}
}
private void doRollback() throws IOException {
void doRollback(FSNamesystem fsns) throws IOException {
// Rollback is allowed only if there is
// a previous fs states in at least one of the storage directories.
// Directories that don't have previous state do not rollback
@ -439,85 +424,46 @@ public class FSImage implements Closeable {
FSImage prevState = new FSImage(conf);
try {
prevState.getStorage().layoutVersion = HdfsConstants.NAMENODE_LAYOUT_VERSION;
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // use current directory then
LOG.info("Storage directory " + sd.getRoot()
+ " does not contain previous fs state.");
// read and verify consistency with other directories
storage.readProperties(sd);
if (!NNUpgradeUtil.canRollBack(sd, storage, prevState.getStorage(),
HdfsConstants.NAMENODE_LAYOUT_VERSION)) {
continue;
}
// read and verify consistency of the prev dir
prevState.getStorage().readPreviousVersionProperties(sd);
if (prevState.getLayoutVersion() != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException(
"Cannot rollback to storage version " +
prevState.getLayoutVersion() +
" using this version of the NameNode, which uses storage version " +
HdfsConstants.NAMENODE_LAYOUT_VERSION + ". " +
"Please use the previous version of HDFS to perform the rollback.");
}
canRollback = true;
}
if (fsns.isHaEnabled()) {
// If HA is enabled, check if the shared log can be rolled back as well.
editLog.initJournalsForWrite();
canRollback |= editLog.canRollBackSharedLog(prevState.getStorage(),
HdfsConstants.NAMENODE_LAYOUT_VERSION);
}
if (!canRollback)
throw new IOException("Cannot rollback. None of the storage "
+ "directories contain previous fs state.");
// Now that we know all directories are going to be consistent
// Do rollback for each directory containing previous state
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists())
continue;
LOG.info("Rolling back storage directory " + sd.getRoot()
+ ".\n new LV = " + prevState.getStorage().getLayoutVersion()
+ "; new CTime = " + prevState.getStorage().getCTime());
File tmpDir = sd.getRemovedTmp();
assert !tmpDir.exists() : "removed.tmp directory must not exist.";
// rename current to tmp
File curDir = sd.getCurrentDir();
assert curDir.exists() : "Current directory must exist.";
NNStorage.rename(curDir, tmpDir);
// rename previous to current
NNStorage.rename(prevDir, curDir);
// delete tmp dir
NNStorage.deleteDir(tmpDir);
LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
+ ".\n new LV = " + prevState.getStorage().getLayoutVersion()
+ "; new CTime = " + prevState.getStorage().getCTime());
NNUpgradeUtil.doRollBack(sd);
}
if (fsns.isHaEnabled()) {
// If HA is enabled, try to roll back the shared log as well.
editLog.doRollback();
}
isUpgradeFinalized = true;
} finally {
prevState.close();
}
}
private void doFinalize(StorageDirectory sd) throws IOException {
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // already discarded
LOG.info("Directory " + prevDir + " does not exist.");
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
return;
}
LOG.info("Finalizing upgrade for storage directory "
+ sd.getRoot() + "."
+ (storage.getLayoutVersion()==0 ? "" :
"\n cur LV = " + storage.getLayoutVersion()
+ "; cur CTime = " + storage.getCTime()));
assert sd.getCurrentDir().exists() : "Current directory must exist.";
final File tmpDir = sd.getFinalizedTmp();
// rename previous to tmp and remove
NNStorage.rename(prevDir, tmpDir);
NNStorage.deleteDir(tmpDir);
isUpgradeFinalized = true;
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
}
/**
* Load image from a checkpoint directory and save it into the current one.
* @param target the NameSystem to import into
@ -562,11 +508,22 @@ public class FSImage implements Closeable {
getStorage().writeAll();
}
void finalizeUpgrade() throws IOException {
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
void finalizeUpgrade(boolean finalizeEditLog) throws IOException {
LOG.info("Finalizing upgrade for local dirs. " +
(storage.getLayoutVersion() == 0 ? "" :
"\n cur LV = " + storage.getLayoutVersion()
+ "; cur CTime = " + storage.getCTime()));
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
doFinalize(sd);
NNUpgradeUtil.doFinalize(sd);
}
if (finalizeEditLog) {
// We only do this in the case that HA is enabled and we're active. In any
// other case the NN will have done the upgrade of the edits directories
// already by virtue of the fact that they're local.
editLog.doFinalizeOfSharedLog();
}
isUpgradeFinalized = true;
}
boolean isUpgradeFinalized() {
@ -763,18 +720,33 @@ public class FSImage implements Closeable {
}
}
public void initEditLog(StartupOption startOpt) {
public void initEditLog(StartupOption startOpt) throws IOException {
Preconditions.checkState(getNamespaceID() != 0,
"Must know namespace ID before initting edit log");
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(conf, nameserviceId) ||
(HAUtil.isHAEnabled(conf, nameserviceId) &&
RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
// If this NN is not HA or this NN is HA, but we're doing a rollback of
// rolling upgrade so init the edit log for write.
if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
// If this NN is not HA
editLog.initJournalsForWrite();
editLog.recoverUnclosedStreams();
} else if (HAUtil.isHAEnabled(conf, nameserviceId)
&& (startOpt == StartupOption.UPGRADE
|| RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
// This NN is HA, but we're doing an upgrade or a rollback of rolling
// upgrade so init the edit log for write.
editLog.initJournalsForWrite();
if (startOpt == StartupOption.UPGRADE) {
long sharedLogCTime = editLog.getSharedLogCTime();
if (this.storage.getCTime() < sharedLogCTime) {
throw new IOException("It looks like the shared log is already " +
"being upgraded but this NN has not been upgraded yet. You " +
"should restart this NameNode with the '" +
StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
"this NN in sync with the other.");
}
}
editLog.recoverUnclosedStreams();
} else {
// This NN is HA and we're not doing an upgrade.
editLog.initSharedJournalsForRead();
}
}

View File

@ -159,6 +159,8 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -170,8 +172,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
@ -556,6 +556,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return leaseManager;
}
boolean isHaEnabled() {
return haEnabled;
}
/**
* Check the supplied configuration for correctness.
* @param conf Supplies the configuration to validate.
@ -891,7 +895,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
if (!haEnabled) {
if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
fsImage.openEditLogForWrite();
}
success = true;
@ -1017,6 +1021,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
dir.fsImage.editLog.openForWrite();
}
if (haEnabled) {
// Renew all of the leases before becoming active.
// This is because, while we were in standby mode,
@ -1052,14 +1057,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
initializedReplQueues = true;
}
private boolean inActiveState() {
return haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE;
}
/**
* @return Whether the namenode is transitioning to active state and is in the
* middle of the {@link #startActiveServices()}
*/
public boolean inTransitionToActive() {
return haEnabled && haContext != null
&& haContext.getState().getServiceState() == HAServiceState.ACTIVE
&& startingActiveService;
return haEnabled && inActiveState() && startingActiveService;
}
private boolean shouldUseDelegationTokens() {
@ -4587,11 +4595,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void finalizeUpgrade() throws IOException {
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
checkOperation(OperationCategory.UNCHECKED);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
getFSImage().finalizeUpgrade();
checkOperation(OperationCategory.UNCHECKED);
getFSImage().finalizeUpgrade(this.isHaEnabled() && inActiveState());
} finally {
writeUnlock();
}
@ -7758,5 +7766,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
logger.addAppender(asyncAppender);
}
}
}

View File

@ -17,35 +17,37 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Comparator;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
/**
* Journal manager for the common case of edits files being written
@ -531,4 +533,49 @@ public class FileJournalManager implements JournalManager {
public void discardSegments(long startTxid) throws IOException {
discardEditLogSegments(startTxid);
}
@Override
public void doPreUpgrade() throws IOException {
LOG.info("Starting upgrade of edits directory " + sd.getRoot());
try {
NNUpgradeUtil.doPreUpgrade(sd);
} catch (IOException ioe) {
LOG.error("Failed to move aside pre-upgrade storage " +
"in image directory " + sd.getRoot(), ioe);
throw ioe;
}
}
/**
* This method assumes that the fields of the {@link Storage} object have
* already been updated to the appropriate new values for the upgrade.
*/
@Override
public void doUpgrade(Storage storage) throws IOException {
NNUpgradeUtil.doUpgrade(sd, storage);
}
@Override
public void doFinalize() throws IOException {
NNUpgradeUtil.doFinalize(sd);
}
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
return NNUpgradeUtil.canRollBack(sd, storage,
prevStorage, targetLayoutVersion);
}
@Override
public void doRollback() throws IOException {
NNUpgradeUtil.doRollBack(sd);
}
@Override
public long getJournalCTime() throws IOException {
StorageInfo sInfo = new StorageInfo((NodeType)null);
sInfo.readProperties(sd);
return sInfo.getCTime();
}
}

View File

@ -22,7 +22,9 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
/**
@ -66,6 +68,54 @@ public interface JournalManager extends Closeable, LogsPurgeable,
*/
void recoverUnfinalizedSegments() throws IOException;
/**
* Perform any steps that must succeed across all JournalManagers involved in
* an upgrade before proceeding onto the actual upgrade stage. If a call to
* any JM's doPreUpgrade method fails, then doUpgrade will not be called for
* any JM.
*/
void doPreUpgrade() throws IOException;
/**
* Perform the actual upgrade of the JM. After this is completed, the NN can
* begin to use the new upgraded metadata. This metadata may later be either
* finalized or rolled back to the previous state.
*
* @param storage info about the new upgraded versions.
*/
void doUpgrade(Storage storage) throws IOException;
/**
* Finalize the upgrade. JMs should purge any state that they had been keeping
* around during the upgrade process. After this is completed, rollback is no
* longer allowed.
*/
void doFinalize() throws IOException;
/**
* Return true if this JM can roll back to the previous storage state, false
* otherwise. The NN will refuse to run the rollback operation unless at least
* one JM or fsimage storage directory can roll back.
*
* @param storage the storage info for the current state
* @param prevStorage the storage info for the previous (unupgraded) state
* @param targetLayoutVersion the layout version we intend to roll back to
* @return true if this JM can roll back, false otherwise.
*/
boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException;
/**
* Perform the rollback to the previous FS state. JMs which do not need to
* roll back their state should just return without error.
*/
void doRollback() throws IOException;
/**
* @return the CTime of the journal manager.
*/
long getJournalCTime() throws IOException;
/**
* Discard the segments whose first txid is >= the given txid.
* @param startTxId The given txid should be right at the segment boundary,
@ -93,4 +143,5 @@ public interface JournalManager extends Closeable, LogsPurgeable,
super(reason);
}
}
}

View File

@ -33,6 +33,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -78,11 +80,14 @@ public class JournalSet implements JournalManager {
private final JournalManager journal;
private boolean disabled = false;
private EditLogOutputStream stream;
private boolean required = false;
private final boolean required;
private final boolean shared;
public JournalAndStream(JournalManager manager, boolean required) {
public JournalAndStream(JournalManager manager, boolean required,
boolean shared) {
this.journal = manager;
this.required = required;
this.shared = shared;
}
public void startLogSegment(long txId, int layoutVersion) throws IOException {
@ -164,6 +169,10 @@ public class JournalSet implements JournalManager {
public boolean isRequired() {
return required;
}
public boolean isShared() {
return shared;
}
}
// COW implementation is necessary since some users (eg the web ui) call
@ -179,7 +188,7 @@ public class JournalSet implements JournalManager {
@Override
public void format(NamespaceInfo nsInfo) throws IOException {
// The iteration is done by FSEditLog itself
// The operation is done by FSEditLog itself
throw new UnsupportedOperationException();
}
@ -541,7 +550,11 @@ public class JournalSet implements JournalManager {
}
void add(JournalManager j, boolean required) {
JournalAndStream jas = new JournalAndStream(j, required);
add(j, required, false);
}
void add(JournalManager j, boolean required, boolean shared) {
JournalAndStream jas = new JournalAndStream(j, required, shared);
journals.add(jas);
}
@ -663,4 +676,40 @@ public class JournalSet implements JournalManager {
// This operation is handled by FSEditLog directly.
throw new UnsupportedOperationException();
}
@Override
public void doPreUpgrade() throws IOException {
// This operation is handled by FSEditLog directly.
throw new UnsupportedOperationException();
}
@Override
public void doUpgrade(Storage storage) throws IOException {
// This operation is handled by FSEditLog directly.
throw new UnsupportedOperationException();
}
@Override
public void doFinalize() throws IOException {
// This operation is handled by FSEditLog directly.
throw new UnsupportedOperationException();
}
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
// This operation is handled by FSEditLog directly.
throw new UnsupportedOperationException();
}
@Override
public void doRollback() throws IOException {
// This operation is handled by FSEditLog directly.
throw new UnsupportedOperationException();
}
@Override
public long getJournalCTime() throws IOException {
// This operation is handled by FSEditLog directly.
throw new UnsupportedOperationException();
}
}

View File

@ -301,7 +301,7 @@ public class NNStorage extends Storage implements Closeable,
if(dirName.getScheme().compareTo("file") == 0) {
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
dirType,
!sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
}
}
@ -312,7 +312,7 @@ public class NNStorage extends Storage implements Closeable,
// URI is of type file://
if(dirName.getScheme().compareTo("file") == 0)
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
NameNodeDirType.EDITS, sharedEditsDirs.contains(dirName)));
}
}
@ -1007,7 +1007,7 @@ public class NNStorage extends Storage implements Closeable,
StringBuilder layoutVersions = new StringBuilder();
// First determine what range of layout versions we're going to inspect
for (Iterator<StorageDirectory> it = dirIterator();
for (Iterator<StorageDirectory> it = dirIterator(false);
it.hasNext();) {
StorageDirectory sd = it.next();
if (!sd.getVersionFile().exists()) {

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
abstract class NNUpgradeUtil {
private static final Log LOG = LogFactory.getLog(NNUpgradeUtil.class);
/**
* Return true if this storage dir can roll back to the previous storage
* state, false otherwise. The NN will refuse to run the rollback operation
* unless at least one JM or fsimage storage directory can roll back.
*
* @param storage the storage info for the current state
* @param prevStorage the storage info for the previous (unupgraded) state
* @param targetLayoutVersion the layout version we intend to roll back to
* @return true if this JM can roll back, false otherwise.
* @throws IOException in the event of error
*/
static boolean canRollBack(StorageDirectory sd, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // use current directory then
LOG.info("Storage directory " + sd.getRoot()
+ " does not contain previous fs state.");
// read and verify consistency with other directories
storage.readProperties(sd);
return false;
}
// read and verify consistency of the prev dir
prevStorage.readPreviousVersionProperties(sd);
if (prevStorage.getLayoutVersion() != targetLayoutVersion) {
throw new IOException(
"Cannot rollback to storage version " +
prevStorage.getLayoutVersion() +
" using this version of the NameNode, which uses storage version " +
targetLayoutVersion + ". " +
"Please use the previous version of HDFS to perform the rollback.");
}
return true;
}
/**
* Finalize the upgrade. The previous dir, if any, will be renamed and
* removed. After this is completed, rollback is no longer allowed.
*
* @param sd the storage directory to finalize
* @throws IOException in the event of error
*/
static void doFinalize(StorageDirectory sd) throws IOException {
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // already discarded
LOG.info("Directory " + prevDir + " does not exist.");
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
return;
}
LOG.info("Finalizing upgrade of storage directory " + sd.getRoot());
assert sd.getCurrentDir().exists() : "Current directory must exist.";
final File tmpDir = sd.getFinalizedTmp();
// rename previous to tmp and remove
NNStorage.rename(prevDir, tmpDir);
NNStorage.deleteDir(tmpDir);
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
}
/**
* Perform any steps that must succeed across all storage dirs/JournalManagers
* involved in an upgrade before proceeding onto the actual upgrade stage. If
* a call to any JM's or local storage dir's doPreUpgrade method fails, then
* doUpgrade will not be called for any JM. The existing current dir is
* renamed to previous.tmp, and then a new, empty current dir is created.
*
* @param sd the storage directory to perform the pre-upgrade procedure.
* @throws IOException in the event of error
*/
static void doPreUpgrade(StorageDirectory sd) throws IOException {
LOG.info("Starting upgrade of storage directory " + sd.getRoot());
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
assert curDir.exists() : "Current directory must exist.";
assert !prevDir.exists() : "previous directory must not exist.";
assert !tmpDir.exists() : "previous.tmp directory must not exist.";
// rename current to tmp
NNStorage.rename(curDir, tmpDir);
if (!curDir.mkdir()) {
throw new IOException("Cannot create directory " + curDir);
}
}
/**
* Perform the upgrade of the storage dir to the given storage info. The new
* storage info is written into the current directory, and the previous.tmp
* directory is renamed to previous.
*
* @param sd the storage directory to upgrade
* @param storage info about the new upgraded versions.
* @throws IOException in the event of error
*/
static void doUpgrade(StorageDirectory sd, Storage storage) throws
IOException {
LOG.info("Performing upgrade of storage directory " + sd.getRoot());
try {
// Write the version file, since saveFsImage only makes the
// fsimage_<txid>, and the directory is otherwise empty.
storage.writeProperties(sd);
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
// rename tmp to previous
NNStorage.rename(tmpDir, prevDir);
} catch (IOException ioe) {
LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
throw ioe;
}
}
/**
* Perform rollback of the storage dir to the previous state. The existing
* current dir is removed, and the previous dir is renamed to current.
*
* @param sd the storage directory to roll back.
* @throws IOException in the event of error
*/
static void doRollBack(StorageDirectory sd)
throws IOException {
File prevDir = sd.getPreviousDir();
if (!prevDir.exists())
return;
File tmpDir = sd.getRemovedTmp();
assert !tmpDir.exists() : "removed.tmp directory must not exist.";
// rename current to tmp
File curDir = sd.getCurrentDir();
assert curDir.exists() : "Current directory must exist.";
NNStorage.rename(curDir, tmpDir);
// rename previous to current
NNStorage.rename(prevDir, curDir);
// delete tmp dir
NNStorage.deleteDir(tmpDir);
LOG.info("Rollback of " + sd.getRoot() + " is complete.");
}
}

View File

@ -662,7 +662,7 @@ public class NameNode implements NameNodeStatusMXBean {
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState();
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
@ -684,8 +684,12 @@ public class NameNode implements NameNodeStatusMXBean {
}
}
protected HAState createHAState() {
return !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
protected HAState createHAState(StartupOption startOpt) {
if (!haEnabled || startOpt == StartupOption.UPGRADE) {
return ACTIVE_STATE;
} else {
return STANDBY_STATE;
}
}
protected HAContext createHAContext() {
@ -1038,25 +1042,27 @@ public class NameNode implements NameNodeStatusMXBean {
}
}
private static boolean finalize(Configuration conf,
boolean isConfirmationNeeded
) throws IOException {
@VisibleForTesting
public static boolean doRollback(Configuration conf,
boolean isConfirmationNeeded) throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
System.err.print(
"\"finalize\" will remove the previous state of the files system.\n"
+ "Recent upgrade will become permanent.\n"
+ "Rollback option will not be available anymore.\n");
"\"rollBack\" will remove the current state of the file system,\n"
+ "returning you to the state prior to initiating your recent.\n"
+ "upgrade. This action is permanent and cannot be undone. If you\n"
+ "are performing a rollback in an HA environment, you should be\n"
+ "certain that no NameNode process is running on any host.");
if (isConfirmationNeeded) {
if (!confirmPrompt("Finalize filesystem state?")) {
System.err.println("Finalize aborted.");
if (!confirmPrompt("Roll back file system state?")) {
System.err.println("Rollback aborted.");
return true;
}
}
nsys.dir.fsImage.finalizeUpgrade();
nsys.dir.fsImage.doRollback(nsys);
return false;
}
@ -1245,14 +1251,6 @@ public class NameNode implements NameNodeStatusMXBean {
}
setStartupOption(conf, startOpt);
if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf)) &&
(startOpt == StartupOption.UPGRADE ||
startOpt == StartupOption.ROLLBACK ||
startOpt == StartupOption.FINALIZE)) {
throw new HadoopIllegalArgumentException("Invalid startup option. " +
"Cannot perform DFS upgrade with HA enabled.");
}
switch (startOpt) {
case FORMAT: {
boolean aborted = format(conf, startOpt.getForceFormat(),
@ -1267,10 +1265,17 @@ public class NameNode implements NameNodeStatusMXBean {
return null;
}
case FINALIZE: {
boolean aborted = finalize(conf, true);
terminate(aborted ? 1 : 0);
System.err.println("Use of the argument '" + StartupOption.FINALIZE +
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'");
terminate(1);
return null; // avoid javac warning
}
case ROLLBACK: {
boolean aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);

View File

@ -44,9 +44,9 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -871,7 +873,24 @@ public class DFSAdmin extends FsShell {
*/
public int finalizeUpgrade() throws IOException {
DistributedFileSystem dfs = getDFS();
dfs.finalizeUpgrade();
Configuration dfsConf = dfs.getConf();
URI dfsUri = dfs.getUri();
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
if (isHaEnabled) {
// In the case of HA, run finalizeUpgrade for all NNs in this nameservice
String nsId = dfsUri.getHost();
List<ClientProtocol> namenodes =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, nsId);
if (!HAUtil.isAtLeastOneActive(namenodes)) {
throw new IOException("Cannot finalize with no NameNode active");
}
for (ClientProtocol haNn : namenodes) {
haNn.finalizeUpgrade();
}
} else {
dfs.finalizeUpgrade();
}
return 0;
}

View File

@ -145,6 +145,72 @@ message DiscardSegmentsRequestProto {
message DiscardSegmentsResponseProto {
}
/**
* getJournalCTime()
*/
message GetJournalCTimeRequestProto {
required JournalIdProto jid = 1;
}
message GetJournalCTimeResponseProto {
required int64 resultCTime = 1;
}
/**
* doPreUpgrade()
*/
message DoPreUpgradeRequestProto {
required JournalIdProto jid = 1;
}
message DoPreUpgradeResponseProto {
}
/**
* doUpgrade()
*/
message DoUpgradeRequestProto {
required JournalIdProto jid = 1;
required StorageInfoProto sInfo = 2;
}
message DoUpgradeResponseProto {
}
/**
* doFinalize()
*/
message DoFinalizeRequestProto {
required JournalIdProto jid = 1;
}
message DoFinalizeResponseProto {
}
/**
* canRollBack()
*/
message CanRollBackRequestProto {
required JournalIdProto jid = 1;
required StorageInfoProto storage = 2;
required StorageInfoProto prevStorage = 3;
required int32 targetLayoutVersion = 4;
}
message CanRollBackResponseProto {
required bool canRollBack = 1;
}
/**
* doRollback()
*/
message DoRollbackRequestProto {
required JournalIdProto jid = 1;
}
message DoRollbackResponseProto {
}
/**
* getJournalState()
*/
@ -250,6 +316,18 @@ service QJournalProtocolService {
rpc discardSegments(DiscardSegmentsRequestProto) returns (DiscardSegmentsResponseProto);
rpc getJournalCTime(GetJournalCTimeRequestProto) returns (GetJournalCTimeResponseProto);
rpc doPreUpgrade(DoPreUpgradeRequestProto) returns (DoPreUpgradeResponseProto);
rpc doUpgrade(DoUpgradeRequestProto) returns (DoUpgradeResponseProto);
rpc doFinalize(DoFinalizeRequestProto) returns (DoFinalizeResponseProto);
rpc canRollBack(CanRollBackRequestProto) returns (CanRollBackResponseProto);
rpc doRollback(DoRollbackRequestProto) returns (DoRollbackResponseProto);
rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);

View File

@ -763,3 +763,49 @@ digest:hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=:rwcda
Even if automatic failover is configured, you may initiate a manual failover
using the same <<<hdfs haadmin>>> command. It will perform a coordinated
failover.
* HDFS Upgrade/Finalization/Rollback with HA Enabled
When moving between versions of HDFS, sometimes the newer software can simply
be installed and the cluster restarted. Sometimes, however, upgrading the
version of HDFS you're running may require changing on-disk data. In this case,
one must use the HDFS Upgrade/Finalize/Rollback facility after installing the
new software. This process is made more complex in an HA environment, since the
on-disk metadata that the NN relies upon is by definition distributed, both on
the two HA NNs in the pair, and on the JournalNodes in the case that QJM is
being used for the shared edits storage. This documentation section describes
the procedure to use the HDFS Upgrade/Finalize/Rollback facility in an HA setup.
<<To perform an HA upgrade>>, the operator must do the following:
[[1]] Shut down all of the NNs as normal, and install the newer software.
[[2]] Start one of the NNs with the <<<'-upgrade'>>> flag.
[[3]] On start, this NN will not enter the standby state as usual in an HA
setup. Rather, this NN will immediately enter the active state, perform an
upgrade of its local storage dirs, and also perform an upgrade of the shared
edit log.
[[4]] At this point the other NN in the HA pair will be out of sync with
the upgraded NN. In order to bring it back in sync and once again have a highly
available setup, you should re-bootstrap this NameNode by running the NN with
the <<<'-bootstrapStandby'>>> flag. It is an error to start this second NN with
the <<<'-upgrade'>>> flag.
Note that if at any time you want to restart the NameNodes before finalizing
or rolling back the upgrade, you should start the NNs as normal, i.e. without
any special startup flag.
<<To finalize an HA upgrade>>, the operator will use the <<<`hdfsadmin
dfsadmin -finalizeUpgrade'>>> command while the NNs are running and one of them
is active. The active NN at the time this happens will perform the finalization
of the shared log, and the NN whose local storage directories contain the
previous FS state will delete its local state.
<<To perform a rollback>> of an upgrade, both NNs should first be shut down.
The operator should run the roll back command on the NN where they initiated
the upgrade procedure, which will perform the rollback on the local dirs there,
as well as on the shared log, either NFS or on the JNs. Afterward, this NN
should be started and the operator should run <<<`-bootstrapStandby'>>> on the
other NN to bring the two NNs in sync with this rolled-back file system state.

View File

@ -147,6 +147,7 @@ public class MiniDFSCluster {
private boolean enableManagedDfsDirsRedundancy = true;
private boolean manageDataDfsDirs = true;
private StartupOption option = null;
private StartupOption dnOption = null;
private String[] racks = null;
private String [] hosts = null;
private long [] simulatedCapacities = null;
@ -243,6 +244,14 @@ public class MiniDFSCluster {
return this;
}
/**
* Default: null
*/
public Builder dnStartupOption(StartupOption val) {
this.dnOption = val;
return this;
}
/**
* Default: null
*/
@ -371,6 +380,7 @@ public class MiniDFSCluster {
builder.enableManagedDfsDirsRedundancy,
builder.manageDataDfsDirs,
builder.option,
builder.dnOption,
builder.racks,
builder.hosts,
builder.simulatedCapacities,
@ -427,18 +437,24 @@ public class MiniDFSCluster {
/**
* Stores the information related to a namenode in the cluster
*/
static class NameNodeInfo {
public static class NameNodeInfo {
final NameNode nameNode;
final Configuration conf;
final String nameserviceId;
final String nnId;
StartupOption startOpt;
NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
Configuration conf) {
StartupOption startOpt, Configuration conf) {
this.nameNode = nn;
this.nameserviceId = nameserviceId;
this.nnId = nnId;
this.startOpt = startOpt;
this.conf = conf;
}
public void setStartOpt(StartupOption startOpt) {
this.startOpt = startOpt;
}
}
/**
@ -622,8 +638,8 @@ public class MiniDFSCluster {
long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(conf, numDataNodes, StorageType.DEFAULT, format,
manageNameDfsDirs, true, true, manageDataDfsDirs,
operation, racks, hosts,
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
operation, null, racks, hosts,
simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
}
@ -632,7 +648,8 @@ public class MiniDFSCluster {
Configuration conf,
int numDataNodes, StorageType storageType, boolean format, boolean manageNameDfsDirs,
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs, StartupOption operation, String[] racks,
boolean manageDataDfsDirs, StartupOption startOpt,
StartupOption dnStartOpt, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
@ -685,7 +702,7 @@ public class MiniDFSCluster {
createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
enableManagedDfsDirsRedundancy,
format, operation, clusterId, conf);
format, startOpt, clusterId, conf);
} catch (IOException ioe) {
LOG.error("IOE creating namenodes. Permissions dump:\n" +
createPermissionsDiagnosisString(data_dir));
@ -698,13 +715,14 @@ public class MiniDFSCluster {
}
}
if (operation == StartupOption.RECOVER) {
if (startOpt == StartupOption.RECOVER) {
return;
}
// Start the DataNodes
startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs,
operation, racks, hosts, simulatedCapacities, setupHostsFile,
dnStartOpt != null ? dnStartOpt : startOpt,
racks, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
waitClusterUp();
//make sure ProxyUsers uses the latest conf
@ -783,6 +801,8 @@ public class MiniDFSCluster {
if (manageNameDfsSharedDirs) {
URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1);
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
// Clean out the shared edits dir completely, including all subdirectories.
FileUtil.fullyDelete(new File(sharedEditsUri));
}
}
@ -890,7 +910,8 @@ public class MiniDFSCluster {
URI srcDir = Lists.newArrayList(srcDirs).get(0);
FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();
for (URI dstDir : dstDirs) {
Preconditions.checkArgument(!dstDir.equals(srcDir));
Preconditions.checkArgument(!dstDir.equals(srcDir),
"src and dst are the same: " + dstDir);
File dstDirF = new File(dstDir);
if (dstDirF.exists()) {
if (!FileUtil.fullyDelete(dstDirF)) {
@ -924,6 +945,18 @@ public class MiniDFSCluster {
conf.set(key, "127.0.0.1:" + nnConf.getIpcPort());
}
private static String[] createArgs(StartupOption operation) {
if (operation == StartupOption.ROLLINGUPGRADE) {
return new String[]{operation.getName(),
operation.getRollingUpgradeStartupOption().name()};
}
String[] args = (operation == null ||
operation == StartupOption.FORMAT ||
operation == StartupOption.REGULAR) ?
new String[] {} : new String[] {operation.getName()};
return args;
}
private void createNameNode(int nnIndex, Configuration conf,
int numDataNodes, boolean format, StartupOption operation,
String clusterId, String nameserviceId,
@ -938,10 +971,7 @@ public class MiniDFSCluster {
}
// Start the NameNode
String[] args = (operation == null ||
operation == StartupOption.FORMAT ||
operation == StartupOption.REGULAR) ?
new String[] {} : new String[] {operation.getName()};
String[] args = createArgs(operation);
NameNode nn = NameNode.createNameNode(args, conf);
if (operation == StartupOption.RECOVER) {
return;
@ -963,7 +993,7 @@ public class MiniDFSCluster {
DFSUtil.setGenericConf(conf, nameserviceId, nnId,
DFS_NAMENODE_HTTP_ADDRESS_KEY);
nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
new Configuration(conf));
operation, new Configuration(conf));
}
/**
@ -1544,7 +1574,7 @@ public class MiniDFSCluster {
nn.stop();
nn.join();
Configuration conf = nameNodes[nnIndex].conf;
nameNodes[nnIndex] = new NameNodeInfo(null, null, null, conf);
nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf);
}
}
@ -1590,10 +1620,17 @@ public class MiniDFSCluster {
String... args) throws IOException {
String nameserviceId = nameNodes[nnIndex].nameserviceId;
String nnId = nameNodes[nnIndex].nnId;
StartupOption startOpt = nameNodes[nnIndex].startOpt;
Configuration conf = nameNodes[nnIndex].conf;
shutdownNameNode(nnIndex);
if (args.length != 0) {
startOpt = null;
} else {
args = createArgs(startOpt);
}
NameNode nn = NameNode.createNameNode(args, conf);
nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, conf);
nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt,
conf);
if (waitActive) {
waitClusterUp();
LOG.info("Restarted the namenode");

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Test;
@ -98,10 +99,10 @@ public class TestDFSRollback {
* Attempts to start a NameNode with the given operation. Starting
* the NameNode should throw an exception.
*/
void startNameNodeShouldFail(StartupOption operation, String searchString) {
void startNameNodeShouldFail(String searchString) {
try {
NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.startupOption(operation)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
@ -150,24 +151,19 @@ public class TestDFSRollback {
log("Normal NameNode rollback", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.build();
NameNode.doRollback(conf, false);
checkResult(NAME_NODE, nameNodeDirs);
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("Normal DataNode rollback", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
@ -180,11 +176,12 @@ public class TestDFSRollback {
log("Normal BlockPool rollback", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
UpgradeUtilities.createBlockPoolStorageDirs(dataNodeDirs, "current",
@ -222,7 +219,7 @@ public class TestDFSRollback {
log("NameNode rollback without existing previous dir", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
startNameNodeShouldFail(StartupOption.ROLLBACK,
startNameNodeShouldFail(
"None of the storage directories contain previous fs state");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
@ -243,11 +240,12 @@ public class TestDFSRollback {
log("DataNode rollback with future stored layout version in previous", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
@ -269,11 +267,12 @@ public class TestDFSRollback {
log("DataNode rollback with newer fsscTime in previous", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLBACK)
.dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
@ -296,16 +295,14 @@ public class TestDFSRollback {
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
deleteMatchingFiles(baseDirs, "edits.*");
startNameNodeShouldFail(StartupOption.ROLLBACK,
"Gap in transactions");
startNameNodeShouldFail("Gap in transactions");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with no image file", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
deleteMatchingFiles(baseDirs, "fsimage_.*");
startNameNodeShouldFail(StartupOption.ROLLBACK,
"No valid image files found");
startNameNodeShouldFail("No valid image files found");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with corrupt version file", numDirs);
@ -317,8 +314,7 @@ public class TestDFSRollback {
"layoutVersion".getBytes(Charsets.UTF_8),
"xxxxxxxxxxxxx".getBytes(Charsets.UTF_8));
}
startNameNodeShouldFail(StartupOption.ROLLBACK,
"file VERSION has layoutVersion missing");
startNameNodeShouldFail("file VERSION has layoutVersion missing");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
@ -332,8 +328,7 @@ public class TestDFSRollback {
UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs,
storageInfo, UpgradeUtilities.getCurrentBlockPoolID(cluster));
startNameNodeShouldFail(StartupOption.ROLLBACK,
"Cannot rollback to storage version 1 using this version");
startNameNodeShouldFail("Cannot rollback to storage version 1 using this version");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
} // end numDir loop
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.junit.Assume;
import org.junit.Before;
@ -758,4 +759,37 @@ public class TestDFSUtil {
assertEquals(4*24*60*60*1000l, DFSUtil.parseRelativeTime("4d"));
assertEquals(999*24*60*60*1000l, DFSUtil.parseRelativeTime("999d"));
}
@Test
public void testAssertAllResultsEqual() {
checkAllResults(new Long[]{}, true);
checkAllResults(new Long[]{1l}, true);
checkAllResults(new Long[]{1l, 1l}, true);
checkAllResults(new Long[]{1l, 1l, 1l}, true);
checkAllResults(new Long[]{new Long(1), new Long(1)}, true);
checkAllResults(new Long[]{null, null, null}, true);
checkAllResults(new Long[]{1l, 2l}, false);
checkAllResults(new Long[]{2l, 1l}, false);
checkAllResults(new Long[]{1l, 2l, 1l}, false);
checkAllResults(new Long[]{2l, 1l, 1l}, false);
checkAllResults(new Long[]{1l, 1l, 2l}, false);
checkAllResults(new Long[]{1l, null}, false);
checkAllResults(new Long[]{null, 1l}, false);
checkAllResults(new Long[]{1l, null, 1l}, false);
}
private static void checkAllResults(Long[] toCheck, boolean shouldSucceed) {
if (shouldSucceed) {
DFSUtil.assertAllResultsEqual(Arrays.asList(toCheck));
} else {
try {
DFSUtil.assertAllResultsEqual(Arrays.asList(toCheck));
fail("Should not have succeeded with input: " +
Arrays.toString(toCheck));
} catch (AssertionError ae) {
GenericTestUtils.assertExceptionContains("Not all elements match", ae);
}
}
}
}

View File

@ -204,7 +204,7 @@ public class TestRollingUpgrade {
.format(false)
.manageNameDfsDirs(false)
.build();
DistributedFileSystem dfs2 = cluster2.getFileSystem();
final DistributedFileSystem dfs2 = cluster2.getFileSystem();
// Check that cluster2 sees the edits made on cluster1
Assert.assertTrue(dfs2.exists(foo));
@ -243,7 +243,8 @@ public class TestRollingUpgrade {
Assert.assertEquals(info1.getStartTime(), finalize.getStartTime());
LOG.info("RESTART cluster 2 with regular startup option");
cluster2.restartNameNode(StartupOption.REGULAR.getName());
cluster2.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
cluster2.restartNameNode();
Assert.assertTrue(dfs2.exists(foo));
Assert.assertTrue(dfs2.exists(bar));
Assert.assertTrue(dfs2.exists(baz));

View File

@ -167,8 +167,16 @@ public class MiniJournalCluster {
return new File(baseDir, "journalnode-" + idx).getAbsoluteFile();
}
public File getJournalDir(int idx, String jid) {
return new File(getStorageDir(idx), jid);
}
public File getCurrentDir(int idx, String jid) {
return new File(new File(getStorageDir(idx), jid), "current");
return new File(getJournalDir(idx, jid), "current");
}
public File getPreviousDir(int idx, String jid) {
return new File(getJournalDir(idx, jid), "previous");
}
public JournalNode getJournalNode(int i) {

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@ -47,6 +48,7 @@ public class MiniQJMHACluster {
public static class Builder {
private final Configuration conf;
private StartupOption startOpt = null;
private final MiniDFSCluster.Builder dfsBuilder;
public Builder(Configuration conf) {
@ -61,6 +63,10 @@ public class MiniQJMHACluster {
public MiniQJMHACluster build() throws IOException {
return new MiniQJMHACluster(this);
}
public void startupOption(StartupOption startOpt) {
this.startOpt = startOpt;
}
}
public static MiniDFSNNTopology createDefaultTopology() {
@ -95,6 +101,9 @@ public class MiniQJMHACluster {
Configuration confNN0 = cluster.getConfiguration(0);
NameNode.initializeSharedEdits(confNN0, true);
cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
// restart the cluster
cluster.restartNameNodes();
}

View File

@ -17,21 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.Test;
public class TestGenericJournalConf {
@ -197,13 +198,59 @@ public class TestGenericJournalConf {
return false;
}
@Override
public void doPreUpgrade() throws IOException {}
@Override
public void doUpgrade(Storage storage) throws IOException {}
@Override
public void doFinalize() throws IOException {}
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion)
throws IOException {
return false;
}
@Override
public void doRollback() throws IOException {}
@Override
public void discardSegments(long startTxId) throws IOException {}
@Override
public long getJournalCTime() throws IOException {
return -1;
}
}
public static class BadConstructorJournalManager extends DummyJournalManager {
public BadConstructorJournalManager() {
super(null, null, null);
}
@Override
public void doPreUpgrade() throws IOException {}
@Override
public void doUpgrade(Storage storage) throws IOException {}
@Override
public void doFinalize() throws IOException {}
@Override
public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion)
throws IOException {
return false;
}
@Override
public void doRollback() throws IOException {}
@Override
public long getJournalCTime() throws IOException {
return -1;
}
}
}

View File

@ -91,7 +91,7 @@ public class TestBootstrapStandby {
fail("Did not throw");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery",
"storage directory does not exist or is not accessible",
ioe);
}

View File

@ -1,40 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
import org.apache.hadoop.hdfs.qjournal.server.Journal;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.base.Joiner;
/**
* Tests for upgrading with HA enabled.
@ -43,43 +59,444 @@ public class TestDFSUpgradeWithHA {
private static final Log LOG = LogFactory.getLog(TestDFSUpgradeWithHA.class);
/**
* Make sure that an HA NN refuses to start if given an upgrade-related
* startup option.
*/
@Test
public void testStartingWithUpgradeOptionsFails() throws IOException {
for (StartupOption startOpt : Lists.newArrayList(new StartupOption[] {
StartupOption.UPGRADE, StartupOption.FINALIZE,
StartupOption.ROLLBACK })) {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(new Configuration())
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.startupOption(startOpt)
.numDataNodes(0)
.build();
fail("Should not have been able to start an HA NN in upgrade mode");
} catch (IllegalArgumentException iae) {
GenericTestUtils.assertExceptionContains(
"Cannot perform DFS upgrade with HA enabled.", iae);
LOG.info("Got expected exception", iae);
} finally {
if (cluster != null) {
cluster.shutdown();
private Configuration conf;
@Before
public void createConfiguration() {
conf = new HdfsConfiguration();
// Turn off persistent IPC, so that the DFSClient can survive NN restart
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
}
private static void assertCTimesEqual(MiniDFSCluster cluster) {
long nn1CTime = cluster.getNamesystem(0).getFSImage().getStorage().getCTime();
long nn2CTime = cluster.getNamesystem(1).getFSImage().getStorage().getCTime();
assertEquals(nn1CTime, nn2CTime);
}
private static void checkClusterPreviousDirExistence(MiniDFSCluster cluster,
boolean shouldExist) {
for (int i = 0; i < 2; i++) {
checkNnPreviousDirExistence(cluster, i, shouldExist);
}
}
private static void checkNnPreviousDirExistence(MiniDFSCluster cluster,
int index, boolean shouldExist) {
Collection<URI> nameDirs = cluster.getNameDirs(index);
for (URI nnDir : nameDirs) {
checkPreviousDirExistence(new File(nnDir), shouldExist);
}
}
private static void checkJnPreviousDirExistence(MiniQJMHACluster jnCluster,
boolean shouldExist) throws IOException {
for (int i = 0; i < 3; i++) {
checkPreviousDirExistence(
jnCluster.getJournalCluster().getJournalDir(i, "ns1"), shouldExist);
}
if (shouldExist) {
assertEpochFilesCopied(jnCluster);
}
}
private static void assertEpochFilesCopied(MiniQJMHACluster jnCluster)
throws IOException {
for (int i = 0; i < 3; i++) {
File journalDir = jnCluster.getJournalCluster().getJournalDir(i, "ns1");
File currDir = new File(journalDir, "current");
File prevDir = new File(journalDir, "previous");
for (String fileName : new String[]{ Journal.LAST_PROMISED_FILENAME,
Journal.LAST_WRITER_EPOCH }) {
File prevFile = new File(prevDir, fileName);
// Possible the prev file doesn't exist, e.g. if there has never been a
// writer before the upgrade.
if (prevFile.exists()) {
PersistentLongFile prevLongFile = new PersistentLongFile(prevFile, -10);
PersistentLongFile currLongFile = new PersistentLongFile(new File(currDir,
fileName), -11);
assertTrue("Value in " + fileName + " has decreased on upgrade in "
+ journalDir, prevLongFile.get() <= currLongFile.get());
}
}
}
}
private static void checkPreviousDirExistence(File rootDir,
boolean shouldExist) {
File previousDir = new File(rootDir, "previous");
if (shouldExist) {
assertTrue(previousDir + " does not exist", previousDir.exists());
} else {
assertFalse(previousDir + " does exist", previousDir.exists());
}
}
private void runFinalizeCommand(MiniDFSCluster cluster)
throws IOException {
HATestUtil.setFailoverConfigurations(cluster, conf);
new DFSAdmin(conf).finalizeUpgrade();
}
/**
* Make sure that an HA NN won't start if a previous upgrade was in progress.
* Ensure that an admin cannot finalize an HA upgrade without at least one NN
* being active.
*/
@Test
public void testStartingWithUpgradeInProgressFails() throws Exception {
public void testCannotFinalizeIfNoActive() throws IOException,
URISyntaxException {
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
// No upgrade is in progress at the moment.
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
checkPreviousDirExistence(sharedDir, false);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkPreviousDirExistence(sharedDir, true);
// NN0 should come up in the active state when given the -upgrade option,
// so no need to transition it to active.
assertTrue(fs.mkdirs(new Path("/foo2")));
// Restart NN0 without the -upgrade flag, to make sure that works.
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
cluster.restartNameNode(0, false);
// Make sure we can still do FS ops after upgrading.
cluster.transitionToActive(0);
assertTrue(fs.mkdirs(new Path("/foo3")));
// Now bootstrap the standby with the upgraded info.
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
// Now restart NN1 and make sure that we can do ops against that as well.
cluster.restartNameNode(1);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
assertTrue(fs.mkdirs(new Path("/foo4")));
assertCTimesEqual(cluster);
// Now there's no active NN.
cluster.transitionToStandby(1);
try {
runFinalizeCommand(cluster);
fail("Should not have been able to finalize upgrade with no NN active");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Cannot finalize with no NameNode active", ioe);
}
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Make sure that an HA NN with NFS-based HA can successfully start and
* upgrade.
*/
@Test
public void testNfsUpgrade() throws IOException, URISyntaxException {
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
// No upgrade is in progress at the moment.
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
checkPreviousDirExistence(sharedDir, false);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkPreviousDirExistence(sharedDir, true);
// NN0 should come up in the active state when given the -upgrade option,
// so no need to transition it to active.
assertTrue(fs.mkdirs(new Path("/foo2")));
// Restart NN0 without the -upgrade flag, to make sure that works.
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
cluster.restartNameNode(0, false);
// Make sure we can still do FS ops after upgrading.
cluster.transitionToActive(0);
assertTrue(fs.mkdirs(new Path("/foo3")));
// Now bootstrap the standby with the upgraded info.
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
// Now restart NN1 and make sure that we can do ops against that as well.
cluster.restartNameNode(1);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
assertTrue(fs.mkdirs(new Path("/foo4")));
assertCTimesEqual(cluster);
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Make sure that an HA NN can successfully upgrade when configured using
* JournalNodes.
*/
@Test
public void testUpgradeWithJournalNodes() throws IOException,
URISyntaxException {
MiniQJMHACluster qjCluster = null;
FileSystem fs = null;
try {
Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder()
.numDataNodes(0);
qjCluster = builder.build();
MiniDFSCluster cluster = qjCluster.getDfsCluster();
// No upgrade is in progress at the moment.
checkJnPreviousDirExistence(qjCluster, false);
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkJnPreviousDirExistence(qjCluster, true);
// NN0 should come up in the active state when given the -upgrade option,
// so no need to transition it to active.
assertTrue(fs.mkdirs(new Path("/foo2")));
// Restart NN0 without the -upgrade flag, to make sure that works.
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
cluster.restartNameNode(0, false);
// Make sure we can still do FS ops after upgrading.
cluster.transitionToActive(0);
assertTrue(fs.mkdirs(new Path("/foo3")));
// Now bootstrap the standby with the upgraded info.
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
// Now restart NN1 and make sure that we can do ops against that as well.
cluster.restartNameNode(1);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
assertTrue(fs.mkdirs(new Path("/foo4")));
assertCTimesEqual(cluster);
} finally {
if (fs != null) {
fs.close();
}
if (qjCluster != null) {
qjCluster.shutdown();
}
}
}
@Test
public void testFinalizeWithJournalNodes() throws IOException,
URISyntaxException {
MiniQJMHACluster qjCluster = null;
FileSystem fs = null;
try {
Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder()
.numDataNodes(0);
qjCluster = builder.build();
MiniDFSCluster cluster = qjCluster.getDfsCluster();
// No upgrade is in progress at the moment.
checkJnPreviousDirExistence(qjCluster, false);
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
assertTrue(fs.mkdirs(new Path("/foo2")));
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkJnPreviousDirExistence(qjCluster, true);
// Now bootstrap the standby with the upgraded info.
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
cluster.restartNameNode(1);
runFinalizeCommand(cluster);
checkClusterPreviousDirExistence(cluster, false);
checkJnPreviousDirExistence(qjCluster, false);
assertCTimesEqual(cluster);
} finally {
if (fs != null) {
fs.close();
}
if (qjCluster != null) {
qjCluster.shutdown();
}
}
}
/**
* Make sure that even if the NN which initiated the upgrade is in the standby
* state that we're allowed to finalize.
*/
@Test
public void testFinalizeFromSecondNameNodeWithJournalNodes()
throws IOException, URISyntaxException {
MiniQJMHACluster qjCluster = null;
FileSystem fs = null;
try {
Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder()
.numDataNodes(0);
qjCluster = builder.build();
MiniDFSCluster cluster = qjCluster.getDfsCluster();
// No upgrade is in progress at the moment.
checkJnPreviousDirExistence(qjCluster, false);
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkJnPreviousDirExistence(qjCluster, true);
// Now bootstrap the standby with the upgraded info.
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
cluster.restartNameNode(1);
// Make the second NN (not the one that initiated the upgrade) active when
// the finalize command is run.
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
runFinalizeCommand(cluster);
checkClusterPreviousDirExistence(cluster, false);
checkJnPreviousDirExistence(qjCluster, false);
assertCTimesEqual(cluster);
} finally {
if (fs != null) {
fs.close();
}
if (qjCluster != null) {
qjCluster.shutdown();
}
}
}
/**
* Make sure that an HA NN will start if a previous upgrade was in progress.
*/
@Test
public void testStartingWithUpgradeInProgressSucceeds() throws Exception {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(new Configuration())
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
@ -94,16 +511,224 @@ public class TestDFSUpgradeWithHA {
}
cluster.restartNameNodes();
fail("Should not have been able to start an HA NN with an in-progress upgrade");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery.",
ioe);
LOG.info("Got expected exception", ioe);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test rollback with NFS shared dir.
*/
@Test
public void testRollbackWithNfs() throws Exception {
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
// No upgrade is in progress at the moment.
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
checkPreviousDirExistence(sharedDir, false);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkPreviousDirExistence(sharedDir, true);
// NN0 should come up in the active state when given the -upgrade option,
// so no need to transition it to active.
assertTrue(fs.mkdirs(new Path("/foo2")));
// Now bootstrap the standby with the upgraded info.
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
cluster.restartNameNode(1);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkPreviousDirExistence(sharedDir, true);
assertCTimesEqual(cluster);
// Now shut down the cluster and do the rollback.
Collection<URI> nn1NameDirs = cluster.getNameDirs(0);
cluster.shutdown();
conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
NameNode.doRollback(conf, false);
// The rollback operation should have rolled back the first NN's local
// dirs, and the shared dir, but not the other NN's dirs. Those have to be
// done by bootstrapping the standby.
checkNnPreviousDirExistence(cluster, 0, false);
checkPreviousDirExistence(sharedDir, false);
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testRollbackWithJournalNodes() throws IOException,
URISyntaxException {
MiniQJMHACluster qjCluster = null;
FileSystem fs = null;
try {
Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder()
.numDataNodes(0);
qjCluster = builder.build();
MiniDFSCluster cluster = qjCluster.getDfsCluster();
// No upgrade is in progress at the moment.
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
checkJnPreviousDirExistence(qjCluster, false);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkJnPreviousDirExistence(qjCluster, true);
// NN0 should come up in the active state when given the -upgrade option,
// so no need to transition it to active.
assertTrue(fs.mkdirs(new Path("/foo2")));
// Now bootstrap the standby with the upgraded info.
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
cluster.restartNameNode(1);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkJnPreviousDirExistence(qjCluster, true);
assertCTimesEqual(cluster);
// Shut down the NNs, but deliberately leave the JNs up and running.
Collection<URI> nn1NameDirs = cluster.getNameDirs(0);
cluster.shutdown();
conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
NameNode.doRollback(conf, false);
// The rollback operation should have rolled back the first NN's local
// dirs, and the shared dir, but not the other NN's dirs. Those have to be
// done by bootstrapping the standby.
checkNnPreviousDirExistence(cluster, 0, false);
checkJnPreviousDirExistence(qjCluster, false);
} finally {
if (fs != null) {
fs.close();
}
if (qjCluster != null) {
qjCluster.shutdown();
}
}
}
/**
* Make sure that starting a second NN with the -upgrade flag fails if the
* other NN has already done that.
*/
@Test
public void testCannotUpgradeSecondNameNode() throws IOException,
URISyntaxException {
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
// No upgrade is in progress at the moment.
checkClusterPreviousDirExistence(cluster, false);
assertCTimesEqual(cluster);
checkPreviousDirExistence(sharedDir, false);
// Transition NN0 to active and do some FS ops.
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(new Path("/foo1")));
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
// flag.
cluster.shutdownNameNode(1);
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
cluster.restartNameNode(0, false);
checkNnPreviousDirExistence(cluster, 0, true);
checkNnPreviousDirExistence(cluster, 1, false);
checkPreviousDirExistence(sharedDir, true);
// NN0 should come up in the active state when given the -upgrade option,
// so no need to transition it to active.
assertTrue(fs.mkdirs(new Path("/foo2")));
// Restart NN0 without the -upgrade flag, to make sure that works.
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
cluster.restartNameNode(0, false);
// Make sure we can still do FS ops after upgrading.
cluster.transitionToActive(0);
assertTrue(fs.mkdirs(new Path("/foo3")));
// Make sure that starting the second NN with the -upgrade flag fails.
cluster.getNameNodeInfos()[1].setStartOpt(StartupOption.UPGRADE);
try {
cluster.restartNameNode(1, false);
fail("Should not have been able to start second NN with -upgrade");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"It looks like the shared log is already being upgraded", ioe);
}
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -96,7 +96,7 @@ public class TestInitializeSharedEdits {
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery", ioe);
"storage directory does not exist or is not accessible", ioe);
}
try {
cluster.restartNameNode(1, false);
@ -104,7 +104,7 @@ public class TestInitializeSharedEdits {
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery", ioe);
"storage directory does not exist or is not accessible", ioe);
}
}