mirror of https://github.com/apache/lucene.git
SOLR-7113: Multiple calls to UpdateLog#init is not thread safe with respect to the HDFS FileSystem client object usage.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1662324 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
00ab5bacc6
commit
f2c9067e59
|
@ -148,6 +148,9 @@ Bug Fixes
|
|||
* SOLR-7104: Propagate property prefix parameters for ADDREPLICA Collections API call.
|
||||
(Varun Thacker via Anshum Gupta)
|
||||
|
||||
* SOLR-7113: Multiple calls to UpdateLog#init is not thread safe with respect to the
|
||||
HDFS FileSystem client object usage. (Mark Miller, Vamsee Yarlagadda)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ public final class CommitTracker implements Runnable {
|
|||
private final SolrCore core;
|
||||
|
||||
private final boolean softCommit;
|
||||
private final boolean openSearcher;
|
||||
private boolean openSearcher;
|
||||
private final boolean waitSearcher = true;
|
||||
|
||||
private String name;
|
||||
|
@ -256,4 +256,9 @@ public final class CommitTracker implements Runnable {
|
|||
public void setTimeUpperBound(long timeUpperBound) {
|
||||
this.timeUpperBound = timeUpperBound;
|
||||
}
|
||||
|
||||
// only for testing - not thread safe
|
||||
public void setOpenSearcher(boolean openSearcher) {
|
||||
this.openSearcher = openSearcher;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,8 @@ import org.apache.solr.util.HdfsUtil;
|
|||
/** @lucene.experimental */
|
||||
public class HdfsUpdateLog extends UpdateLog {
|
||||
|
||||
private volatile FileSystem fs;
|
||||
private final Object fsLock = new Object();
|
||||
private FileSystem fs;
|
||||
private volatile Path tlogDir;
|
||||
private final String confDir;
|
||||
|
||||
|
@ -101,51 +102,42 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
// ulogDir from CoreDescriptor overrides
|
||||
String ulogDir = core.getCoreDescriptor().getUlogDir();
|
||||
|
||||
if (ulogDir != null) {
|
||||
dataDir = ulogDir;
|
||||
}
|
||||
if (dataDir == null || dataDir.length()==0) {
|
||||
dataDir = core.getDataDir();
|
||||
}
|
||||
|
||||
if (!core.getDirectoryFactory().isAbsolute(dataDir)) {
|
||||
try {
|
||||
dataDir = core.getDirectoryFactory().getDataHome(core.getCoreDescriptor());
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
FileSystem oldFs = fs;
|
||||
|
||||
try {
|
||||
fs = FileSystem.newInstance(new Path(dataDir).toUri(), getConf());
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
|
||||
try {
|
||||
if (oldFs != null) {
|
||||
oldFs.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
|
||||
this.uhandler = uhandler;
|
||||
|
||||
if (dataDir.equals(lastDataDir)) {
|
||||
if (debug) {
|
||||
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", next id=" + id,
|
||||
" this is a reopen... nothing else to do.");
|
||||
synchronized (fsLock) {
|
||||
// just like dataDir, we do not allow
|
||||
// moving the tlog dir on reload
|
||||
if (fs == null) {
|
||||
if (ulogDir != null) {
|
||||
dataDir = ulogDir;
|
||||
}
|
||||
if (dataDir == null || dataDir.length() == 0) {
|
||||
dataDir = core.getDataDir();
|
||||
}
|
||||
|
||||
if (!core.getDirectoryFactory().isAbsolute(dataDir)) {
|
||||
try {
|
||||
dataDir = core.getDirectoryFactory().getDataHome(core.getCoreDescriptor());
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
fs = FileSystem.get(new Path(dataDir).toUri(), getConf());
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
} else {
|
||||
if (debug) {
|
||||
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", next id=" + id,
|
||||
" this is a reopen or double init ... nothing else to do.");
|
||||
}
|
||||
versionInfo.reload();
|
||||
return;
|
||||
}
|
||||
|
||||
versionInfo.reload();
|
||||
|
||||
// on a normal reopen, we currently shouldn't have to do anything
|
||||
return;
|
||||
}
|
||||
lastDataDir = dataDir;
|
||||
|
||||
tlogDir = new Path(dataDir, TLOG_NAME);
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -298,8 +290,15 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
if (tlog == null) {
|
||||
String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN,
|
||||
TLOG_NAME, id);
|
||||
tlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName),
|
||||
HdfsTransactionLog ntlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName),
|
||||
globalStrings);
|
||||
tlog = ntlog;
|
||||
|
||||
if (tlog != ntlog) {
|
||||
ntlog.deleteOnClose = false;
|
||||
ntlog.decref();
|
||||
ntlog.forceClose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -168,9 +168,10 @@ public class TransactionLog {
|
|||
}
|
||||
} else {
|
||||
if (start > 0) {
|
||||
log.error("New transaction log already exists:" + tlogFile + " size=" + raf.length());
|
||||
log.warn("New transaction log already exists:" + tlogFile + " size=" + raf.length());
|
||||
return;
|
||||
}
|
||||
assert start==0;
|
||||
|
||||
if (start > 0) {
|
||||
raf.setLength(0);
|
||||
}
|
||||
|
|
|
@ -129,6 +129,8 @@ public abstract class UpdateHandler implements SolrInfoMBean {
|
|||
ulog.clearLog(core, ulogPluginInfo);
|
||||
}
|
||||
|
||||
log.info("Using UpdateLog implementation: " + ulog.getClass().getName());
|
||||
|
||||
ulog.init(ulogPluginInfo);
|
||||
|
||||
ulog.init(this, core);
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
package org.apache.solr.update;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
|
||||
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
|
||||
|
||||
@ThreadLeakScope(Scope.NONE) // hdfs mini cluster currently leaks threads
|
||||
@SuppressObjectReleaseTracker(bugUrl = "https://issues.apache.org/jira/browse/SOLR-7115")
|
||||
public class TestHdfsUpdateLog extends SolrTestCaseJ4 {
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
private static String hdfsUri;
|
||||
|
||||
private static FileSystem fs;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
||||
hdfsUri = dfsCluster.getFileSystem().getUri().toString();
|
||||
|
||||
try {
|
||||
URI uri = new URI(hdfsUri);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||
fs = FileSystem.get(uri, conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
System.setProperty("solr.ulog.dir", hdfsUri + "/solr/shard1");
|
||||
|
||||
initCore("solrconfig-tlog.xml","schema15.xml");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
System.clearProperty("solr.ulog.dir");
|
||||
System.clearProperty("test.build.data");
|
||||
System.clearProperty("test.cache.data");
|
||||
deleteCore();
|
||||
IOUtils.closeQuietly(fs);
|
||||
fs = null;
|
||||
HdfsTestUtil.teardownClass(dfsCluster);
|
||||
|
||||
hdfsDataDir = null;
|
||||
dfsCluster = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFSThreadSafety() throws Exception {
|
||||
|
||||
final SolrQueryRequest req = req();
|
||||
final UpdateHandler uhandler = req.getCore().getUpdateHandler();
|
||||
((DirectUpdateHandler2) uhandler).getCommitTracker().setTimeUpperBound(100);
|
||||
((DirectUpdateHandler2) uhandler).getCommitTracker().setOpenSearcher(false);
|
||||
final UpdateLog ulog = uhandler.getUpdateLog();
|
||||
|
||||
clearIndex();
|
||||
assertU(commit());
|
||||
|
||||
// we hammer on init in a background thread to make
|
||||
// sure we don't run into any filesystem already closed
|
||||
// problems (SOLR-7113)
|
||||
|
||||
Thread thread = new Thread() {
|
||||
public void run() {
|
||||
int cnt = 0;
|
||||
while (true) {
|
||||
ulog.init(uhandler, req.getCore());
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
}
|
||||
if (cnt++ > 50) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread thread2 = new Thread() {
|
||||
public void run() {
|
||||
int cnt = 0;
|
||||
while (true) {
|
||||
assertU(adoc("id", Integer.toString(cnt)));
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
}
|
||||
if (cnt++ > 500) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
thread.start();
|
||||
thread2.start();
|
||||
thread.join();
|
||||
thread2.join();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -168,6 +168,18 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
public String bugUrl() default "None";
|
||||
}
|
||||
|
||||
/**
|
||||
* Annotation for test classes that want to disable ObjectReleaseTracker
|
||||
*/
|
||||
@Documented
|
||||
@Inherited
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.TYPE)
|
||||
public @interface SuppressObjectReleaseTracker {
|
||||
/** Point to JIRA entry. */
|
||||
public String bugUrl();
|
||||
}
|
||||
|
||||
// these are meant to be accessed sequentially, but are volatile just to ensure any test
|
||||
// thread will read the latest value
|
||||
protected static volatile SSLTestConfig sslConfig;
|
||||
|
@ -214,7 +226,13 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
deleteCore();
|
||||
resetExceptionIgnores();
|
||||
endTrackingSearchers();
|
||||
assertTrue("Some resources were not closed, shutdown, or released.", ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty());
|
||||
if (!RandomizedContext.current().getTargetClass().isAnnotationPresent(SuppressObjectReleaseTracker.class)) {
|
||||
assertTrue("Some resources were not closed, shutdown, or released.", ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty());
|
||||
} else {
|
||||
if (!ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty()) {
|
||||
log.warn("Some resources were not closed, shutdown, or released. Remove the SuppressObjectReleaseTracker annotation to get more information on the fail.");
|
||||
}
|
||||
}
|
||||
resetFactory();
|
||||
coreName = DEFAULT_TEST_CORENAME;
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue