mirror of https://github.com/apache/lucene.git
SOLR-6969: When opening an HDFSTransactionLog for append we must first attempt to recover it's lease to prevent data loss.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1655754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
84ffb0855f
commit
0068708e14
|
@ -576,6 +576,9 @@ Bug Fixes
|
|||
* SOLR-7016: Fix bin\solr.cmd to work in a directory with spaces in the name.
|
||||
(Timothy Potter, Uwe Schindler)
|
||||
|
||||
* SOLR-6969: When opening an HDFSTransactionLog for append we must first attempt to recover
|
||||
it's lease to prevent data loss. (Mark Miller, Praneeth Varma, Colin McCabe)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -62,6 +62,8 @@
|
|||
<dependency org="org.apache.hadoop" name="hadoop-annotations" rev="${/org.apache.hadoop/hadoop-annotations}" conf="compile.hadoop"/>
|
||||
<dependency org="org.apache.hadoop" name="hadoop-auth" rev="${/org.apache.hadoop/hadoop-auth}" conf="compile.hadoop"/>
|
||||
<dependency org="commons-configuration" name="commons-configuration" rev="${/commons-configuration/commons-configuration}" conf="compile.hadoop"/>
|
||||
<dependency org="commons-collections" name="commons-collections" rev="${/commons-collections/commons-collections}" conf="compile.hadoop"/>
|
||||
|
||||
<dependency org="com.google.protobuf" name="protobuf-java" rev="${/com.google.protobuf/protobuf-java}" conf="compile.hadoop"/>
|
||||
<dependency org="com.googlecode.concurrentlinkedhashmap" name="concurrentlinkedhashmap-lru" rev="${/com.googlecode.concurrentlinkedhashmap/concurrentlinkedhashmap-lru}" conf="compile.hadoop"/>
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.solr.common.util.FastInputStream;
|
|||
import org.apache.solr.common.util.FastOutputStream;
|
||||
import org.apache.solr.common.util.JavaBinCodec;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.util.FSHDFSUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -79,8 +80,9 @@ public class HdfsTransactionLog extends TransactionLog {
|
|||
}
|
||||
this.tlogFile = tlogFile;
|
||||
|
||||
// TODO: look into forcefully taking over any lease
|
||||
if (fs.exists(tlogFile) && openExisting) {
|
||||
FSHDFSUtils.recoverFileLease(fs, tlogFile, fs.getConf());
|
||||
|
||||
tlogOutStream = fs.append(tlogFile);
|
||||
} else {
|
||||
fs.delete(tlogFile, false);
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -46,6 +47,9 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
private volatile FileSystem fs;
|
||||
private volatile Path tlogDir;
|
||||
private final String confDir;
|
||||
|
||||
// used internally by tests to track total count of failed tran log loads in init
|
||||
public static AtomicLong INIT_FAILED_LOGS_COUNT = new AtomicLong();
|
||||
|
||||
public HdfsUpdateLog() {
|
||||
this.confDir = null;
|
||||
|
@ -189,6 +193,7 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
addOldLog(oldLog, false); // don't remove old logs on startup since more
|
||||
// than one may be uncapped.
|
||||
} catch (Exception e) {
|
||||
INIT_FAILED_LOGS_COUNT.incrementAndGet();
|
||||
SolrException.log(log, "Failure to open existing log file (non fatal) "
|
||||
+ f, e);
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
package org.apache.solr.util;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Borrowed from Apache HBase to recover an HDFS lease.
|
||||
*/
|
||||
|
||||
public class FSHDFSUtils {
|
||||
public static Logger log = LoggerFactory.getLogger(FSHDFSUtils.class);
|
||||
|
||||
|
||||
/**
|
||||
* Recover the lease from HDFS, retrying multiple times.
|
||||
*/
|
||||
public static void recoverFileLease(final FileSystem fs, final Path p, Configuration conf) throws IOException {
|
||||
// lease recovery not needed for local file system case.
|
||||
if (!(fs instanceof DistributedFileSystem)) return;
|
||||
recoverDFSFileLease((DistributedFileSystem)fs, p, conf);
|
||||
}
|
||||
|
||||
/*
|
||||
* Run the dfs recover lease. recoverLease is asynchronous. It returns:
|
||||
* -false when it starts the lease recovery (i.e. lease recovery not *yet* done)
|
||||
* - true when the lease recovery has succeeded or the file is closed.
|
||||
* But, we have to be careful. Each time we call recoverLease, it starts the recover lease
|
||||
* process over from the beginning. We could put ourselves in a situation where we are
|
||||
* doing nothing but starting a recovery, interrupting it to start again, and so on.
|
||||
* The findings over in HBASE-8354 have it that the namenode will try to recover the lease
|
||||
* on the file's primary node. If all is well, it should return near immediately. But,
|
||||
* as is common, it is the very primary node that has crashed and so the namenode will be
|
||||
* stuck waiting on a socket timeout before it will ask another datanode to start the
|
||||
* recovery. It does not help if we call recoverLease in the meantime and in particular,
|
||||
* subsequent to the socket timeout, a recoverLease invocation will cause us to start
|
||||
* over from square one (possibly waiting on socket timeout against primary node). So,
|
||||
* in the below, we do the following:
|
||||
* 1. Call recoverLease.
|
||||
* 2. If it returns true, break.
|
||||
* 3. If it returns false, wait a few seconds and then call it again.
|
||||
* 4. If it returns true, break.
|
||||
* 5. If it returns false, wait for what we think the datanode socket timeout is
|
||||
* (configurable) and then try again.
|
||||
* 6. If it returns true, break.
|
||||
* 7. If it returns false, repeat starting at step 5. above.
|
||||
*
|
||||
* If HDFS-4525 is available, call it every second and we might be able to exit early.
|
||||
*/
|
||||
static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, final Configuration conf)
|
||||
throws IOException {
|
||||
log.info("Recovering lease on dfs file " + p);
|
||||
long startWaiting = System.nanoTime();
|
||||
// Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
|
||||
// usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
|
||||
// beyond that limit 'to be safe'.
|
||||
long recoveryTimeout = TimeUnit.NANOSECONDS.convert(conf.getInt("solr.hdfs.lease.recovery.timeout", 900000), TimeUnit.MILLISECONDS) + startWaiting;
|
||||
// This setting should be a little bit above what the cluster dfs heartbeat is set to.
|
||||
long firstPause = conf.getInt("solr.hdfs.lease.recovery.first.pause", 4000);
|
||||
// This should be set to how long it'll take for us to timeout against primary datanode if it
|
||||
// is dead. We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
|
||||
// default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
|
||||
long subsequentPause = conf.getInt("solr.hdfs.lease.recovery.dfs.timeout", 61 * 1000);
|
||||
|
||||
Method isFileClosedMeth = null;
|
||||
// whether we need to look for isFileClosed method
|
||||
boolean findIsFileClosedMeth = true;
|
||||
boolean recovered = false;
|
||||
// We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
|
||||
for (int nbAttempt = 0; !recovered; nbAttempt++) {
|
||||
recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
|
||||
if (recovered) break;
|
||||
if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) break;
|
||||
try {
|
||||
// On the first time through wait the short 'firstPause'.
|
||||
if (nbAttempt == 0) {
|
||||
Thread.sleep(firstPause);
|
||||
} else {
|
||||
// Cycle here until subsequentPause elapses. While spinning, check isFileClosed if
|
||||
// available (should be in hadoop 2.0.5... not in hadoop 1 though.
|
||||
long localStartWaiting = System.nanoTime();
|
||||
while ((System.nanoTime() - localStartWaiting) <
|
||||
subsequentPause) {
|
||||
Thread.sleep(conf.getInt("solr.hdfs.lease.recovery.pause", 1000));
|
||||
if (findIsFileClosedMeth) {
|
||||
try {
|
||||
isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
|
||||
new Class[]{ Path.class });
|
||||
} catch (NoSuchMethodException nsme) {
|
||||
log.debug("isFileClosed not available");
|
||||
} finally {
|
||||
findIsFileClosedMeth = false;
|
||||
}
|
||||
}
|
||||
if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
|
||||
recovered = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
return recovered;
|
||||
}
|
||||
|
||||
static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
|
||||
final int nbAttempt, final Path p, final long startWaiting) {
|
||||
if (recoveryTimeout < System.nanoTime()) {
|
||||
log.warn("Cannot recoverLease after trying for " +
|
||||
conf.getInt("solr.hdfs.lease.recovery.timeout", 900000) +
|
||||
"ms (solr.hdfs.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
|
||||
getLogMessageDetail(nbAttempt, p, startWaiting));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to recover the lease.
|
||||
* @return True if dfs#recoverLease came by true.
|
||||
*/
|
||||
static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, final long startWaiting)
|
||||
throws FileNotFoundException {
|
||||
boolean recovered = false;
|
||||
try {
|
||||
recovered = dfs.recoverLease(p);
|
||||
log.info("recoverLease=" + recovered + ", " +
|
||||
getLogMessageDetail(nbAttempt, p, startWaiting));
|
||||
} catch (IOException e) {
|
||||
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
|
||||
// This exception comes out instead of FNFE, fix it
|
||||
throw new FileNotFoundException("The given transactionlog file wasn't found at " + p);
|
||||
} else if (e instanceof FileNotFoundException) {
|
||||
throw (FileNotFoundException)e;
|
||||
}
|
||||
log.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
|
||||
}
|
||||
return recovered;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Detail to append to any log message around lease recovering.
|
||||
*/
|
||||
private static String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
|
||||
return "attempt=" + nbAttempt + " on file=" + p + " after " +
|
||||
TimeUnit.MILLISECONDS.convert(System.nanoTime() - startWaiting, TimeUnit.NANOSECONDS) + "ms";
|
||||
}
|
||||
|
||||
/**
|
||||
* Call HDFS-4525 isFileClosed if it is available.
|
||||
*
|
||||
* @return True if file is closed.
|
||||
*/
|
||||
private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
|
||||
try {
|
||||
return (Boolean) m.invoke(dfs, p);
|
||||
} catch (SecurityException e) {
|
||||
log.warn("No access", e);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed invocation for " + p.toString(), e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.cloud.CollectionsAPIDistributedZkTest;
|
||||
import org.apache.solr.update.HdfsUpdateLog;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
|
@ -46,6 +47,7 @@ public class HdfsCollectionsAPIDistributedZkTest extends CollectionsAPIDistribut
|
|||
|
||||
@AfterClass
|
||||
public static void teardownClass() throws Exception {
|
||||
assertEquals(0, HdfsUpdateLog.INIT_FAILED_LOGS_COUNT.get());
|
||||
HdfsTestUtil.teardownClass(dfsCluster);
|
||||
System.clearProperty("solr.hdfs.home");
|
||||
System.clearProperty("solr.hdfs.blockcache.enabled");
|
||||
|
|
|
@ -10,10 +10,14 @@ import java.util.TimerTask;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
|
@ -38,6 +42,8 @@ public class HdfsTestUtil {
|
|||
|
||||
private static Map<MiniDFSCluster,Timer> timers = new ConcurrentHashMap<>();
|
||||
|
||||
private static FSDataOutputStream badTlogOutStream;
|
||||
|
||||
public static MiniDFSCluster setupClass(String dir) throws Exception {
|
||||
return setupClass(dir, true);
|
||||
}
|
||||
|
@ -69,10 +75,11 @@ public class HdfsTestUtil {
|
|||
|
||||
final MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
|
||||
dfsCluster.waitActive();
|
||||
|
||||
|
||||
System.setProperty("solr.hdfs.home", getDataDir(dfsCluster, "solr_hdfs_home"));
|
||||
|
||||
if (safeModeTesting) {
|
||||
int rndMode = LuceneTestCase.random().nextInt(10);
|
||||
if (safeModeTesting && rndMode > 4) {
|
||||
NameNodeAdapter.enterSafeMode(dfsCluster.getNameNode(), false);
|
||||
|
||||
int rnd = LuceneTestCase.random().nextInt(10000);
|
||||
|
@ -86,6 +93,13 @@ public class HdfsTestUtil {
|
|||
}, rnd);
|
||||
|
||||
timers.put(dfsCluster, timer);
|
||||
} else {
|
||||
// force a lease recovery by creating a tlog file and not closing it
|
||||
URI uri = dfsCluster.getURI();
|
||||
Path hdfsDirPath = new Path(uri.toString() + "/solr/collection1/core_node1/data/tlog/tlog.0000000000000000000");
|
||||
// tran log already being created testing
|
||||
FileSystem fs = FileSystem.newInstance(hdfsDirPath.toUri(), conf);
|
||||
badTlogOutStream = fs.create(hdfsDirPath);
|
||||
}
|
||||
|
||||
SolrTestCaseJ4.useFactory("org.apache.solr.core.HdfsDirectoryFactory");
|
||||
|
@ -105,6 +119,10 @@ public class HdfsTestUtil {
|
|||
dfsCluster.shutdown();
|
||||
}
|
||||
|
||||
if (badTlogOutStream != null) {
|
||||
IOUtils.closeQuietly(badTlogOutStream);
|
||||
}
|
||||
|
||||
// TODO: we HACK around HADOOP-9643
|
||||
if (savedLocale != null) {
|
||||
Locale.setDefault(savedLocale);
|
||||
|
|
Loading…
Reference in New Issue