HDFS-4025. QJM: Sychronize past log segments to JNs that missed them. Contributed by Hanisha Koneru.

This commit is contained in:
Jing Zhao 2017-02-22 16:33:38 -08:00
parent b10e962224
commit 13d4bcfe35
14 changed files with 853 additions and 42 deletions

View File

@ -707,6 +707,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
// Edit Log segment transfer timeout
public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY =
"dfs.edit.log.transfer.timeout";
public static final int DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT = 30 * 1000;
// Throttling Edit Log Segment transfer for Journal Sync
public static final String DFS_EDIT_LOG_TRANSFER_RATE_KEY =
"dfs.edit.log.transfer.bandwidthPerSec";
public static final long DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT = 0; //no throttling
// Datanode File IO Stats
public static final String DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY =
"dfs.datanode.enable.fileio.profiling";
@ -891,6 +901,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
public static final String DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY = "dfs.journalnode.kerberos.principal";
public static final String DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
public static final String DFS_JOURNALNODE_ENABLE_SYNC_KEY =
"dfs.journalnode.enable.sync";
public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT = false;
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
"dfs.journalnode.sync.interval";
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
// Journal-node related configs for the client side.
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -42,6 +41,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRe
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
@ -51,8 +51,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@ -362,41 +360,17 @@ public class QuorumJournalManager implements JournalManager {
URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
throws IOException {
List<AsyncLogger> ret = Lists.newArrayList();
List<InetSocketAddress> addrs = getLoggerAddresses(uri);
List<InetSocketAddress> addrs = Util.getAddressesList(uri);
if (addrs.size() % 2 == 0) {
LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
"of Journal Nodes specified. This is not recommended!");
}
String jid = parseJournalId(uri);
for (InetSocketAddress addr : addrs) {
ret.add(factory.createLogger(conf, nsInfo, jid, addr));
}
return ret;
}
private static List<InetSocketAddress> getLoggerAddresses(URI uri)
throws IOException {
String authority = uri.getAuthority();
Preconditions.checkArgument(authority != null && !authority.isEmpty(),
"URI has no authority: " + uri);
String[] parts = StringUtils.split(authority, ';');
for (int i = 0; i < parts.length; i++) {
parts[i] = parts[i].trim();
}
if (parts.length % 2 == 0) {
LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
"of Journal Nodes specified. This is not recommended!");
}
List<InetSocketAddress> addrs = Lists.newArrayList();
for (String addr : parts) {
InetSocketAddress isa = NetUtils.createSocketAddr(
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
if (isa.isUnresolved()) {
throw new UnknownHostException(addr);
}
addrs.add(isa);
}
return addrs;
}
@Override
public EditLogOutputStream startLogSegment(long txId, int layoutVersion)

View File

@ -49,7 +49,6 @@ class JNStorage extends Storage {
private final FileJournalManager fjm;
private final StorageDirectory sd;
private StorageState state;
private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
ImmutableList.of(
@ -121,6 +120,14 @@ class JNStorage extends Storage {
return new File(sd.getCurrentDir(), name);
}
File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) {
return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp);
}
File getFinalizedEditsFile(long startTxId, long endTxId) {
return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId);
}
/**
* @return the path for the file which contains persisted data for the
* paxos-like recovery process for the given log segment.

View File

@ -1092,6 +1092,25 @@ public class Journal implements Closeable {
committedTxnId.set(startTxId - 1);
}
synchronized boolean renameTmpSegment(File tmpFile, File finalFile,
long endTxId) throws IOException {
final boolean success;
if (endTxId <= committedTxnId.get()) {
success = tmpFile.renameTo(finalFile);
if (!success) {
LOG.warn("Unable to rename edits file from " + tmpFile + " to " +
finalFile);
}
} else {
success = false;
LOG.error("The endTxId of the temporary file is not less than the " +
"last committed transaction id. Aborting renaming to final file" +
finalFile);
}
return success;
}
public Long getJournalCTime() throws IOException {
return storage.getJournalManager().getJournalCTime();
}

View File

@ -68,6 +68,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
private JournalNodeRpcServer rpcServer;
private JournalNodeHttpServer httpServer;
private final Map<String, Journal> journalsById = Maps.newHashMap();
private final Map<String, JournalNodeSyncer> journalSyncersById = Maps
.newHashMap();
private ObjectName journalNodeInfoBeanName;
private String httpServerURI;
private File localDir;
@ -92,11 +94,24 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
LOG.info("Initializing journal in directory " + logDir);
journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter());
journalsById.put(jid, journal);
// Start SyncJouranl thread, if JournalNode Sync is enabled
if (conf.getBoolean(
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) {
startSyncer(journal, jid);
}
}
return journal;
}
private void startSyncer(Journal journal, String jid) {
JournalNodeSyncer jSyncer = new JournalNodeSyncer(this, journal, jid, conf);
journalSyncersById.put(jid, jSyncer);
jSyncer.start();
}
@VisibleForTesting
public Journal getOrCreateJournal(String jid) throws IOException {
return getOrCreateJournal(jid, StartupOption.REGULAR);
@ -190,7 +205,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
*/
public void stop(int rc) {
this.resultCode = rc;
for (JournalNodeSyncer jSyncer : journalSyncersById.values()) {
jSyncer.stopSync();
}
if (rpcServer != null) {
rpcServer.stop();
}

View File

@ -0,0 +1,413 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
.JournalIdProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
/**
* A Journal Sync thread runs through the lifetime of the JN. It periodically
* gossips with other journal nodes to compare edit log manifests and if it
* detects any missing log segment, it downloads it from the other journal node
*/
@InterfaceAudience.Private
public class JournalNodeSyncer {
public static final Logger LOG = LoggerFactory.getLogger(
JournalNodeSyncer.class);
private final JournalNode jn;
private final Journal journal;
private final String jid;
private final JournalIdProto jidProto;
private final JNStorage jnStorage;
private final Configuration conf;
private volatile Daemon syncJournalDaemon;
private volatile boolean shouldSync = true;
private List<JournalNodeProxy> otherJNProxies = Lists.newArrayList();
private int numOtherJNs;
private int journalNodeIndexForSync = 0;
private final long journalSyncInterval;
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
Configuration conf) {
this.jn = jouranlNode;
this.journal = journal;
this.jid = jid;
this.jidProto = convertJournalId(this.jid);
this.jnStorage = journal.getStorage();
this.conf = conf;
journalSyncInterval = conf.getLong(
DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY,
DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT);
logSegmentTransferTimeout = conf.getInt(
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
throttler = getThrottler(conf);
}
void stopSync() {
shouldSync = false;
if (syncJournalDaemon != null) {
syncJournalDaemon.interrupt();
}
}
public void start() {
LOG.info("Starting SyncJournal daemon for journal " + jid);
if (getOtherJournalNodeProxies()) {
startSyncJournalsDaemon();
} else {
LOG.warn("Failed to start SyncJournal daemon for journal " + jid);
}
}
private boolean getOtherJournalNodeProxies() {
List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs();
if (otherJournalNodes == null || otherJournalNodes.isEmpty()) {
LOG.warn("Other JournalNode addresses not available. Journal Syncing " +
"cannot be done");
return false;
}
for (InetSocketAddress addr : otherJournalNodes) {
try {
otherJNProxies.add(new JournalNodeProxy(addr));
} catch (IOException e) {
LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
}
}
if (otherJNProxies.isEmpty()) {
LOG.error("Cannot sync as there is no other JN available for sync.");
return false;
}
numOtherJNs = otherJNProxies.size();
return true;
}
private void startSyncJournalsDaemon() {
syncJournalDaemon = new Daemon(new Runnable() {
@Override
public void run() {
while(shouldSync) {
try {
if (!journal.isFormatted()) {
LOG.warn("Journal not formatted. Cannot sync.");
} else {
syncJournals();
}
Thread.sleep(journalSyncInterval);
} catch (Throwable t) {
if (!shouldSync) {
if (t instanceof InterruptedException) {
LOG.info("Stopping JournalNode Sync.");
} else {
LOG.warn("JournalNodeSyncer received an exception while " +
"shutting down.", t);
}
break;
} else {
if (t instanceof InterruptedException) {
LOG.warn("JournalNodeSyncer interrupted", t);
break;
}
}
LOG.error(
"JournalNodeSyncer daemon received Runtime exception. ", t);
}
}
}
});
syncJournalDaemon.start();
}
private void syncJournals() {
syncWithJournalAtIndex(journalNodeIndexForSync);
journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
}
private void syncWithJournalAtIndex(int index) {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with "
+ otherJNProxies.get(index) + ", journal id: " + jid);
final QJournalProtocolPB jnProxy = otherJNProxies.get(index).jnProxy;
if (jnProxy == null) {
LOG.error("JournalNode Proxy not found.");
return;
}
List<RemoteEditLog> thisJournalEditLogs;
try {
thisJournalEditLogs = journal.getEditLogManifest(0, false).getLogs();
} catch (IOException e) {
LOG.error("Exception in getting local edit log manifest", e);
return;
}
GetEditLogManifestResponseProto editLogManifest;
try {
editLogManifest = jnProxy.getEditLogManifest(null,
GetEditLogManifestRequestProto.newBuilder().setJid(jidProto)
.setSinceTxId(0)
.setInProgressOk(false).build());
} catch (ServiceException e) {
LOG.error("Could not sync with Journal at " +
otherJNProxies.get(journalNodeIndexForSync), e);
return;
}
getMissingLogSegments(thisJournalEditLogs, editLogManifest,
otherJNProxies.get(index));
}
private List<InetSocketAddress> getOtherJournalNodeAddrs() {
URI uri = null;
try {
String uriStr = conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
if (uriStr == null || uriStr.isEmpty()) {
LOG.warn("Could not construct Shared Edits Uri");
return null;
}
uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
Sets.newHashSet(jn.getBoundIpcAddress()));
} catch (URISyntaxException e) {
LOG.error("The conf property " + DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly.");
} catch (IOException e) {
LOG.error("Could not parse JournalNode addresses: " + uri);
}
return null;
}
private JournalIdProto convertJournalId(String journalId) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(journalId)
.build();
}
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
GetEditLogManifestResponseProto response,
JournalNodeProxy remoteJNproxy) {
List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
response.getManifest()).getLogs();
if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
LOG.warn("Journal at " + remoteJNproxy.jnAddr + " has no edit logs");
return;
}
List<RemoteEditLog> missingLogs = getMissingLogList(thisJournalEditLogs,
otherJournalEditLogs);
if (!missingLogs.isEmpty()) {
NamespaceInfo nsInfo = jnStorage.getNamespaceInfo();
for (RemoteEditLog missingLog : missingLogs) {
URL url = null;
boolean success = false;
try {
if (remoteJNproxy.httpServerUrl == null) {
if (response.hasFromURL()) {
URI uri = URI.create(response.getFromURL());
remoteJNproxy.httpServerUrl = getHttpServerURI(uri.getScheme(),
uri.getHost(), uri.getPort());
} else {
remoteJNproxy.httpServerUrl = getHttpServerURI("http",
remoteJNproxy.jnAddr.getHostName(), response.getHttpPort());
}
}
String urlPath = GetJournalEditServlet.buildPath(jid, missingLog
.getStartTxId(), nsInfo);
url = new URL(remoteJNproxy.httpServerUrl, urlPath);
success = downloadMissingLogSegment(url, missingLog);
} catch (MalformedURLException e) {
LOG.error("MalformedURL when download missing log segment", e);
} catch (Exception e) {
LOG.error("Exception in downloading missing log segment from url " +
url, e);
}
if (!success) {
LOG.error("Aborting current sync attempt.");
break;
}
}
}
}
/**
* Returns the logs present in otherJournalEditLogs and missing from
* thisJournalEditLogs.
*/
private List<RemoteEditLog> getMissingLogList(
List<RemoteEditLog> thisJournalEditLogs,
List<RemoteEditLog> otherJournalEditLogs) {
if (thisJournalEditLogs.isEmpty()) {
return otherJournalEditLogs;
}
List<RemoteEditLog> missingEditLogs = Lists.newArrayList();
int thisJnIndex = 0, otherJnIndex = 0;
int thisJnNumLogs = thisJournalEditLogs.size();
int otherJnNumLogs = otherJournalEditLogs.size();
while (thisJnIndex < thisJnNumLogs && otherJnIndex < otherJnNumLogs) {
long localJNstartTxId = thisJournalEditLogs.get(thisJnIndex)
.getStartTxId();
long remoteJNstartTxId = otherJournalEditLogs.get(otherJnIndex)
.getStartTxId();
if (localJNstartTxId == remoteJNstartTxId) {
thisJnIndex++;
otherJnIndex++;
} else if (localJNstartTxId > remoteJNstartTxId) {
missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
otherJnIndex++;
} else {
thisJnIndex++;
}
}
if (otherJnIndex < otherJnNumLogs) {
for (; otherJnIndex < otherJnNumLogs; otherJnIndex++) {
missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
}
}
return missingEditLogs;
}
private URL getHttpServerURI(String scheme, String hostname, int port)
throws MalformedURLException {
return new URL(scheme, hostname, port, "");
}
/**
* Transfer an edit log from one journal node to another for sync-up.
*/
private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws
IOException {
LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage
.getRoot());
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : "bad log: " + log;
File finalEditsFile = jnStorage.getFinalizedEditsFile(log.getStartTxId(),
log.getEndTxId());
if (finalEditsFile.exists() && FileUtil.canRead(finalEditsFile)) {
LOG.info("Skipping download of remote edit log " + log + " since it's" +
" already stored locally at " + finalEditsFile);
return true;
}
final long milliTime = Time.monotonicNow();
File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log
.getEndTxId(), milliTime);
try {
Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false,
logSegmentTransferTimeout, throttler);
} catch (IOException e) {
LOG.error("Download of Edit Log file for Syncing failed. Deleting temp " +
"file: " + tmpEditsFile);
if (!tmpEditsFile.delete()) {
LOG.warn("Deleting " + tmpEditsFile + " has failed");
}
return false;
}
LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " +
tmpEditsFile.length() + " bytes.");
LOG.debug("Renaming " + tmpEditsFile.getName() + " to "
+ finalEditsFile.getName());
boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile,
finalEditsFile, log.getEndTxId());
if (!renameSuccess) {
//If rename is not successful, delete the tmpFile
LOG.debug("Renaming unsuccessful. Deleting temporary file: "
+ tmpEditsFile);
if (!tmpEditsFile.delete()) {
LOG.warn("Deleting " + tmpEditsFile + " has failed");
}
return false;
}
return true;
}
private static DataTransferThrottler getThrottler(Configuration conf) {
long transferBandwidth =
conf.getLong(DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT);
DataTransferThrottler throttler = null;
if (transferBandwidth > 0) {
throttler = new DataTransferThrottler(transferBandwidth);
}
return throttler;
}
private class JournalNodeProxy {
private final InetSocketAddress jnAddr;
private final QJournalProtocolPB jnProxy;
private URL httpServerUrl;
JournalNodeProxy(InetSocketAddress jnAddr) throws IOException {
this.jnAddr = jnAddr;
this.jnProxy = RPC.getProxy(QJournalProtocolPB.class,
RPC.getProtocolVersion(QJournalProtocolPB.class), jnAddr, conf);
}
@Override
public String toString() {
return jnAddr.toString();
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.ToolRunner;
@ -1010,6 +1011,14 @@ public abstract class Storage extends StorageInfo {
return false;
}
public NamespaceInfo getNamespaceInfo() {
return new NamespaceInfo(
getNamespaceID(),
getClusterID(),
null,
getCTime());
}
/**
* Return true if the layout of the given storage directory is from a version
* of Hadoop prior to the introduction of the "current" and "previous"

View File

@ -22,9 +22,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
@ -32,18 +34,23 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@ -143,7 +150,8 @@ public final class Util {
* storage.
*/
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
Storage dstStorage, boolean getChecksum, int timeout,
DataTransferThrottler throttler) throws IOException {
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
@ -176,7 +184,7 @@ public final class Util {
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
throttler);
}
/**
@ -268,7 +276,7 @@ public final class Util {
long xferKb = received / 1024;
xferCombined += xferSec;
xferStats.append(
String.format(" The fsimage download took %.2fs at %.2f KB/s.",
String.format(" The file download took %.2fs at %.2f KB/s.",
xferSec, xferKb / xferSec));
} finally {
stream.close();
@ -301,7 +309,7 @@ public final class Util {
advertisedSize);
}
}
xferStats.insert(0, String.format("Combined time for fsimage download and" +
xferStats.insert(0, String.format("Combined time for file download and" +
" fsync to all disks took %.2fs.", xferCombined));
LOG.info(xferStats.toString());
@ -350,4 +358,34 @@ public final class Util {
String header = connection.getHeaderField(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
public static List<InetSocketAddress> getAddressesList(URI uri)
throws IOException{
String authority = uri.getAuthority();
Preconditions.checkArgument(authority != null && !authority.isEmpty(),
"URI has no authority: " + uri);
String[] parts = StringUtils.split(authority, ';');
for (int i = 0; i < parts.length; i++) {
parts[i] = parts[i].trim();
}
List<InetSocketAddress> addrs = Lists.newArrayList();
for (String addr : parts) {
InetSocketAddress isa = NetUtils.createSocketAddr(
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
if (isa.isUnresolved()) {
throw new UnknownHostException(addr);
}
addrs.add(isa);
}
return addrs;
}
public static List<InetSocketAddress> getLoggerAddresses(URI uri,
Set<InetSocketAddress> addrsToExclude) throws IOException {
List<InetSocketAddress> addrsList = getAddressesList(uri);
addrsList.removeAll(addrsToExclude);
return addrsList;
}
}

View File

@ -763,13 +763,13 @@ public class NNStorage extends Storage implements Closeable,
return new File(sd.getCurrentDir(), getInProgressEditsFileName(startTxId));
}
static File getFinalizedEditsFile(StorageDirectory sd,
public static File getFinalizedEditsFile(StorageDirectory sd,
long startTxId, long endTxId) {
return new File(sd.getCurrentDir(),
getFinalizedEditsFileName(startTxId, endTxId));
}
static File getTemporaryEditsFile(StorageDirectory sd,
public static File getTemporaryEditsFile(StorageDirectory sd,
long startTxId, long endTxId, long timestamp) {
return new File(sd.getCurrentDir(),
getTemporaryEditsFileName(startTxId, endTxId, timestamp));
@ -1106,6 +1106,7 @@ public class NNStorage extends Storage implements Closeable,
return inspector;
}
@Override
public NamespaceInfo getNamespaceInfo() {
return new NamespaceInfo(
getNamespaceID(),

View File

@ -401,7 +401,8 @@ public class TransferFsImage {
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout,
null);
}
private static MD5Hash parseMD5Header(HttpServletRequest request) {

View File

@ -1278,6 +1278,26 @@
</description>
</property>
<property>
<name>dfs.edit.log.transfer.timeout</name>
<value>30000</value>
<description>
Socket timeout for edit log transfer in milliseconds. This timeout
should be configured such that normal edit log transfer for journal
node syncing can complete successfully.
</description>
</property>
<property>
<name>dfs.edit.log.transfer.bandwidthPerSec</name>
<value>0</value>
<description>
Maximum bandwidth used for transferring edit log to between journal nodes
for syncing, in bytes per second.
A default value of 0 indicates that throttling is disabled.
</description>
</property>
<property>
<name>dfs.namenode.support.allow.format</name>
<value>true</value>
@ -3784,6 +3804,27 @@
</description>
</property>
<property>
<name>dfs.journalnode.enable.sync</name>
<value>true</value>
<description>
If true, the journal nodes wil sync with each other. The journal nodes
will periodically gossip with other journal nodes to compare edit log
manifests and if they detect any missing log segment, they will download
it from the other journal nodes.
</description>
</property>
<property>
<name>dfs.journalnode.sync.interval</name>
<value>120000</value>
<description>
Time interval, in milliseconds, between two Journal Node syncs.
This configuration takes effect only if the journalnode sync is enabled
by setting the configuration parameter dfs.journalnode.enable.sync to true.
</description>
</property>
<property>
<name>dfs.journalnode.kerberos.internal.spnego.principal</name>
<value></value>

View File

@ -255,4 +255,12 @@ public class MiniJournalCluster {
}
}
}
public void setNamenodeSharedEditsConf(String jid) {
URI quorumJournalURI = getQuorumJournalURI(jid);
for (int i = 0; i < nodes.length; i++) {
nodes[i].node.getConf().set(DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
}
}
}

View File

@ -101,6 +101,7 @@ public class MiniQJMHACluster {
journalCluster = new MiniJournalCluster.Builder(conf).format(true)
.build();
journalCluster.waitActive();
journalCluster.setNamenodeSharedEditsConf(NAMESERVICE);
URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
// start cluster with specified NameNodes

View File

@ -0,0 +1,264 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Random;
/**
* Unit test for Journal Node formatting upon re-installation and syncing.
*/
public class TestJournalNodeSync {
private MiniQJMHACluster qjmhaCluster;
private MiniDFSCluster dfsCluster;
private MiniJournalCluster jCluster;
private FileSystem fs;
private FSNamesystem namesystem;
private int editsPerformed = 0;
private final String jid = "ns1";
@Before
public void setUpMiniCluster() throws IOException {
final Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
.build();
dfsCluster = qjmhaCluster.getDfsCluster();
jCluster = qjmhaCluster.getJournalCluster();
dfsCluster.transitionToActive(0);
fs = dfsCluster.getFileSystem(0);
namesystem = dfsCluster.getNamesystem(0);
}
@After
public void shutDownMiniCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
@Test(timeout=30000)
public void testJournalNodeSync() throws Exception {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
// Generate some edit logs and delete one.
long firstTxId = generateEditLog();
generateEditLog();
File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId);
GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)),
500, 10000);
}
@Test(timeout=30000)
public void testSyncForMultipleMissingLogs() throws Exception {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
// Generate some edit logs and delete two.
long firstTxId = generateEditLog();
long nextTxId = generateEditLog();
List<File> missingLogs = Lists.newArrayList();
missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
}
@Test(timeout=30000)
public void testSyncForDiscontinuousMissingLogs() throws Exception {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
// Generate some edit logs and delete two discontinuous logs.
long firstTxId = generateEditLog();
generateEditLog();
long nextTxId = generateEditLog();
List<File> missingLogs = Lists.newArrayList();
missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
}
@Test(timeout=30000)
public void testMultipleJournalsMissingLogs() throws Exception {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
File secondJournalDir = jCluster.getJournalDir(1, jid);
StorageDirectory sd = new StorageDirectory(secondJournalDir);
File secondJournalCurrentDir = sd.getCurrentDir();
// Generate some edit logs and delete one log from two journals.
long firstTxId = generateEditLog();
generateEditLog();
List<File> missingLogs = Lists.newArrayList();
missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
}
@Test(timeout=60000)
public void testMultipleJournalsMultipleMissingLogs() throws Exception {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
File secondJournalDir = jCluster.getJournalDir(1, jid);
File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
.getCurrentDir();
File thirdJournalDir = jCluster.getJournalDir(2, jid);
File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir)
.getCurrentDir();
// Generate some edit logs and delete multiple logs in multiple journals.
long firstTxId = generateEditLog();
long secondTxId = generateEditLog();
long thirdTxId = generateEditLog();
List<File> missingLogs = Lists.newArrayList();
missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId));
missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId));
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
// Test JournalNode Sync by randomly deleting edit logs from one or two of
// the journals.
@Test(timeout=60000)
public void testRandomJournalMissingLogs() throws Exception {
Random randomJournal = new Random();
List<File> journalCurrentDirs = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
jid)).getCurrentDir());
}
int count = 0;
long lastStartTxId;
int journalIndex;
List<File> missingLogs = Lists.newArrayList();
while (count < 5) {
lastStartTxId = generateEditLog();
// Delete the last edit log segment from randomly selected journal node
journalIndex = randomJournal.nextInt(3);
missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
lastStartTxId));
// Delete the last edit log segment from two journals for some logs
if (count % 2 == 0) {
journalIndex = (journalIndex + 1) % 3;
missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
lastStartTxId));
}
count++;
}
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
private File deleteEditLog(File currentDir, long startTxId)
throws IOException {
EditLogFile logFile = getLogFile(currentDir, startTxId);
while (logFile.isInProgress()) {
dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
logFile = getLogFile(currentDir, startTxId);
}
File deleteFile = logFile.getFile();
Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete());
return deleteFile;
}
/**
* Do a mutative metadata operation on the file system.
*
* @return true if the operation was successful, false otherwise.
*/
private boolean doAnEdit() throws IOException {
return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
}
/**
* Does an edit and rolls the Edit Log.
*
* @return the startTxId of next segment after rolling edits.
*/
private long generateEditLog() throws IOException {
long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
Assert.assertTrue("Failed to do an edit", doAnEdit());
dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
return startTxId;
}
private Supplier<Boolean> editLogExists(List<File> editLogs) {
Supplier<Boolean> supplier = new Supplier<Boolean>() {
@Override
public Boolean get() {
for (File editLog : editLogs) {
if (!editLog.exists()) {
return false;
}
}
return true;
}
};
return supplier;
}
}