HBASE-15019 Replication stuck when HDFS is restarted.

Signed-off-by: Sean Busbey <busbey@cloudera.com>
This commit is contained in:
Matteo Bertozzi 2016-01-21 00:05:57 -06:00
parent 486f7612be
commit 67c2fc7cd6
3 changed files with 78 additions and 6 deletions

View File

@ -60,8 +60,10 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -450,9 +452,9 @@ public class ReplicationSource extends Thread
* @param p path to split * @param p path to split
* @return start time * @return start time
*/ */
private long getTS(Path p) { private static long getTS(Path p) {
String[] parts = p.getName().split("\\."); int tsIndex = p.getName().lastIndexOf('.') + 1;
return Long.parseLong(parts[parts.length-1]); return Long.parseLong(p.getName().substring(tsIndex));
} }
} }
@ -791,7 +793,6 @@ public class ReplicationSource extends Thread
* @return true if we should continue with that file, false if we are over with it * @return true if we should continue with that file, false if we are over with it
*/ */
protected boolean openReader(int sleepMultiplier) { protected boolean openReader(int sleepMultiplier) {
try { try {
try { try {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -872,6 +873,11 @@ public class ReplicationSource extends Thread
// TODO What happens the log is missing in both places? // TODO What happens the log is missing in both places?
} }
} }
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
recoverLease(conf, currentPath);
this.reader = null;
} catch (IOException ioe) { } catch (IOException ioe) {
if (ioe instanceof EOFException && isCurrentLogEmpty()) return true; if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
LOG.warn(peerClusterZnode + " Got: ", ioe); LOG.warn(peerClusterZnode + " Got: ", ioe);
@ -881,7 +887,7 @@ public class ReplicationSource extends Thread
// which throws a NPE if we open a file before any data node has the most recent block // which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext. // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry."); LOG.warn("Got NPE opening reader, will retry.");
} else if (sleepMultiplier == maxRetriesMultiplier) { } else if (sleepMultiplier >= maxRetriesMultiplier) {
// TODO Need a better way to determine if a file is really gone but // TODO Need a better way to determine if a file is really gone but
// TODO without scanning all logs dir // TODO without scanning all logs dir
LOG.warn("Waited too long for this file, considering dumping"); LOG.warn("Waited too long for this file, considering dumping");
@ -891,6 +897,22 @@ public class ReplicationSource extends Thread
return true; return true;
} }
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("recover WAL lease: " + path);
return isWorkerActive();
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
/* /*
* Checks whether the current log file is empty, and it is not a recovered queue. This is to * Checks whether the current log file is empty, and it is not a recovered queue. This is to
* handle scenario when in an idle cluster, there is no entry in the current log and we keep on * handle scenario when in an idle cluster, there is no entry in the current log and we keep on

View File

@ -0,0 +1,47 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Thrown when the lease was expected to be recovered,
* but the file can't be opened.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class LeaseNotRecoveredException extends HBaseIOException {
public LeaseNotRecoveredException() {
super();
}
public LeaseNotRecoveredException(String message) {
super(message);
}
public LeaseNotRecoveredException(String message, Throwable cause) {
super(message, cause);
}
public LeaseNotRecoveredException(Throwable cause) {
super(cause);
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
// imports for things that haven't moved from regionserver.wal yet. // imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
@ -335,8 +336,10 @@ public class WALFactory {
throw iioe; throw iioe;
} }
} }
throw new LeaseNotRecoveredException(e);
} else {
throw e;
} }
throw e;
} }
} }
} catch (IOException ie) { } catch (IOException ie) {