Merge branch 'master' into jira/lucene-5438-nrt-replication

Conflicts:
	lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
	lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
This commit is contained in:
Mike McCandless 2016-02-07 14:03:13 -05:00
commit 9ba62e5e38
20 changed files with 364 additions and 150 deletions

View File

@ -1,4 +1,4 @@
/** /*
* Copyright (C) 1999-2010, International Business Machines * Copyright (C) 1999-2010, International Business Machines
* Corporation and others. All Rights Reserved. * Corporation and others. All Rights Reserved.
* *

View File

@ -696,49 +696,23 @@ final class IndexFileDeleter implements Closeable {
ensureOpen(); ensureOpen();
if (infoStream.isEnabled("IFD")) { if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "delete \"" + names + "\""); infoStream.message("IFD", "delete " + names + "");
} }
// We make two passes, first deleting any segments_N files, second deleting all the rest. We do this so that if we throw exc or JVM // We make two passes, first deleting any segments_N files, second deleting the rest. We do this so that if we throw exc or JVM
// crashes during deletions, we don't leave the index in an "apparently corrupt" state: // crashes during deletions, even when not on Windows, we don't leave the index in an "apparently corrupt" state:
for(String name : names) { for(String name : names) {
if (name.startsWith(IndexFileNames.SEGMENTS) == false) { if (name.startsWith(IndexFileNames.SEGMENTS) == false) {
continue; continue;
} }
try {
directory.deleteFile(name); directory.deleteFile(name);
} catch (NoSuchFileException | FileNotFoundException e) {
// IndexWriter should only ask us to delete files it knows it wrote, so if we hit this, something is wrong!
if (Constants.WINDOWS) {
// TODO: can we remove this OS-specific hacky logic? If windows deleteFile is buggy, we should instead contain this workaround in
// a WindowsFSDirectory ...
// LUCENE-6684: we suppress this assert for Windows, since a file could be in a confusing "pending delete" state, and falsely
// return NSFE/FNFE
} else {
throw e;
}
}
} }
for(String name : names) { for(String name : names) {
if (name.startsWith(IndexFileNames.SEGMENTS) == true) { if (name.startsWith(IndexFileNames.SEGMENTS) == true) {
continue; continue;
} }
try {
directory.deleteFile(name); directory.deleteFile(name);
} catch (NoSuchFileException | FileNotFoundException e) {
// IndexWriter should only ask us to delete files it knows it wrote, so if we hit this, something is wrong!
if (Constants.WINDOWS) {
// TODO: can we remove this OS-specific hacky logic? If windows deleteFile is buggy, we should instead contain this workaround in
// a WindowsFSDirectory ...
// LUCENE-6684: we suppress this assert for Windows, since a file could be in a confusing "pending delete" state, and falsely
// return NSFE/FNFE
} else {
throw e;
}
}
} }
} }

View File

@ -765,7 +765,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/ */
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException { public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
if (d instanceof FSDirectory && ((FSDirectory) d).checkPendingDeletions()) { if (d instanceof FSDirectory && ((FSDirectory) d).checkPendingDeletions()) {
throw new IllegalArgumentException("Directory " + d + " is still has pending deleted files; cannot initialize IndexWriter"); throw new IllegalArgumentException("Directory " + d + " still has pending deleted files; cannot initialize IndexWriter");
} }
conf.setIndexWriter(this); // prevent reuse by other instances conf.setIndexWriter(this); // prevent reuse by other instances

View File

@ -327,7 +327,7 @@ public abstract class FSDirectory extends BaseDirectory {
if (pendingDeletes.contains(name)) { if (pendingDeletes.contains(name)) {
throw new NoSuchFileException("file \"" + name + "\" is already pending delete"); throw new NoSuchFileException("file \"" + name + "\" is already pending delete");
} }
privateDeleteFile(name); privateDeleteFile(name, false);
maybeDeletePendingFiles(); maybeDeletePendingFiles();
} }
@ -347,7 +347,7 @@ public abstract class FSDirectory extends BaseDirectory {
// Clone the set since we mutate it in privateDeleteFile: // Clone the set since we mutate it in privateDeleteFile:
for(String name : new HashSet<>(pendingDeletes)) { for(String name : new HashSet<>(pendingDeletes)) {
privateDeleteFile(name); privateDeleteFile(name, true);
} }
} }
} }
@ -363,14 +363,21 @@ public abstract class FSDirectory extends BaseDirectory {
} }
} }
private void privateDeleteFile(String name) throws IOException { private void privateDeleteFile(String name, boolean isPendingDelete) throws IOException {
try { try {
Files.delete(directory.resolve(name)); Files.delete(directory.resolve(name));
pendingDeletes.remove(name); pendingDeletes.remove(name);
} catch (NoSuchFileException | FileNotFoundException e) { } catch (NoSuchFileException | FileNotFoundException e) {
// We were asked to delete a non-existent file: // We were asked to delete a non-existent file:
pendingDeletes.remove(name); pendingDeletes.remove(name);
if (isPendingDelete && Constants.WINDOWS) {
// TODO: can we remove this OS-specific hacky logic? If windows deleteFile is buggy, we should instead contain this workaround in
// a WindowsFSDirectory ...
// LUCENE-6684: we suppress this check for Windows, since a file could be in a confusing "pending delete" state, failing the first
// delete attempt with access denied and then apparently falsely failing here when we try ot delete it again, with NSFE/FNFE
} else {
throw e; throw e;
}
} catch (IOException ioe) { } catch (IOException ioe) {
// On windows, a file delete can fail because there's still an open // On windows, a file delete can fail because there's still an open
// file handle against it. We record this in pendingDeletes and // file handle against it. We record this in pendingDeletes and

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -118,7 +119,9 @@ public class FileSwitchDirectory extends Directory {
if (exc != null && files.isEmpty()) { if (exc != null && files.isEmpty()) {
throw exc; throw exc;
} }
return files.toArray(new String[files.size()]); String[] result = files.toArray(new String[files.size()]);
Arrays.sort(result);
return result;
} }
/** Utility method to return a file's extension. */ /** Utility method to return a file's extension. */

View File

@ -106,7 +106,9 @@ public class NRTCachingDirectory extends FilterDirectory implements Accountable
"cache=" + Arrays.toString(cache.listAll()) + ",delegate=" + Arrays.toString(in.listAll())); "cache=" + Arrays.toString(cache.listAll()) + ",delegate=" + Arrays.toString(in.listAll()));
} }
} }
return files.toArray(new String[files.size()]); String[] result = files.toArray(new String[files.size()]);
Arrays.sort(result);
return result;
} }
@Override @Override

View File

@ -17,6 +17,10 @@
package org.apache.lucene; package org.apache.lucene;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
@ -24,16 +28,16 @@ import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogMergePolicy; import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.MergeTrigger; import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.PrintStreamInfoStream;
import java.io.IOException;
/** /**
* Holds tests cases to verify external APIs are accessible * Holds tests cases to verify external APIs are accessible
@ -94,10 +98,15 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
Field idField = newStringField("id", "", Field.Store.YES); Field idField = newStringField("id", "", Field.Store.YES);
doc.add(idField); doc.add(idField);
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())) IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
.setMergeScheduler(new MyMergeScheduler()) .setMergeScheduler(new MyMergeScheduler())
.setMaxBufferedDocs(2).setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH) .setMaxBufferedDocs(2).setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH)
.setMergePolicy(newLogMergePolicy())); .setMergePolicy(newLogMergePolicy());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
iwc.setInfoStream(new PrintStreamInfoStream(new PrintStream(baos, true, IOUtils.UTF_8)));
IndexWriter writer = new IndexWriter(dir, iwc);
LogMergePolicy logMP = (LogMergePolicy) writer.getConfig().getMergePolicy(); LogMergePolicy logMP = (LogMergePolicy) writer.getConfig().getMergePolicy();
logMP.setMergeFactor(10); logMP.setMergeFactor(10);
for(int i=0;i<20;i++) { for(int i=0;i<20;i++) {
@ -111,9 +120,15 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
} }
writer.rollback(); writer.rollback();
try {
assertTrue(mergeThreadCreated); assertTrue(mergeThreadCreated);
assertTrue(mergeCalled); assertTrue(mergeCalled);
assertTrue(excCalled); assertTrue(excCalled);
} catch (AssertionError ae) {
System.out.println("TEST FAILED; IW infoStream output:");
System.out.println(baos.toString(IOUtils.UTF_8));
throw ae;
}
dir.close(); dir.close();
} }

View File

@ -488,4 +488,80 @@ public class TestIndexFileDeleter extends LuceneTestCase {
w.close(); w.close();
dir.close(); dir.close();
} }
// LUCENE-6835: make sure best-effort to not create an "apparently but not really" corrupt index is working:
public void testExcInDeleteFile() throws Throwable {
int iters = atLeast(10);
for(int iter=0;iter<iters;iter++) {
if (VERBOSE) {
System.out.println("TEST: iter=" + iter);
}
MockDirectoryWrapper dir = newMockDirectory();
final AtomicBoolean doFailExc = new AtomicBoolean();
dir.failOn(new MockDirectoryWrapper.Failure() {
@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
if (doFailExc.get() && random().nextInt(4) == 1) {
Exception e = new Exception();
StackTraceElement stack[] = e.getStackTrace();
for (int i = 0; i < stack.length; i++) {
if (stack[i].getClassName().equals(MockDirectoryWrapper.class.getName()) && stack[i].getMethodName().equals("deleteFile")) {
throw new MockDirectoryWrapper.FakeIOException();
}
}
}
}
});
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setMergeScheduler(new SerialMergeScheduler());
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
w.addDocument(new Document());
// makes segments_1
if (VERBOSE) {
System.out.println("TEST: now commit");
}
w.commit();
w.addDocument(new Document());
doFailExc.set(true);
if (VERBOSE) {
System.out.println("TEST: now close");
}
try {
w.close();
if (VERBOSE) {
System.out.println("TEST: no exception (ok)");
}
} catch (RuntimeException re) {
assertTrue(re.getCause() instanceof MockDirectoryWrapper.FakeIOException);
// good
if (VERBOSE) {
System.out.println("TEST: got expected exception:");
re.printStackTrace(System.out);
}
} catch (MockDirectoryWrapper.FakeIOException fioe) {
// good
if (VERBOSE) {
System.out.println("TEST: got expected exception:");
fioe.printStackTrace(System.out);
}
}
doFailExc.set(false);
assertFalse(w.w.isOpen());
for(String name : dir.listAll()) {
if (name.startsWith(IndexFileNames.SEGMENTS)) {
if (VERBOSE) {
System.out.println("TEST: now read " + name);
}
SegmentInfos.readCommit(dir, name);
}
}
dir.close();
}
}
} }

View File

@ -78,6 +78,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.NoLockFactory;
@ -1268,8 +1269,14 @@ public class TestIndexWriter extends LuceneTestCase {
FileSystem fs = new WindowsFS(path.getFileSystem()).getFileSystem(URI.create("file:///")); FileSystem fs = new WindowsFS(path.getFileSystem()).getFileSystem(URI.create("file:///"));
Path indexPath = new FilterPath(path, fs); Path indexPath = new FilterPath(path, fs);
// NOTE: cannot use MMapDir, because WindowsFS doesn't see/think it keeps file handles open? // NOTE: on Unix, we cannot use MMapDir, because WindowsFS doesn't see/think it keeps file handles open. Yet, on Windows, we MUST use
FSDirectory dir = new NIOFSDirectory(indexPath); // MMapDir because the windows OS will in fact prevent file deletion for us, and fails otherwise:
FSDirectory dir;
if (Constants.WINDOWS) {
dir = new MMapDirectory(indexPath);
} else {
dir = new NIOFSDirectory(indexPath);
}
MergePolicy mergePolicy = newLogMergePolicy(true); MergePolicy mergePolicy = newLogMergePolicy(true);

View File

@ -1,4 +1,4 @@
/** /*
* Copyright (c) 2005 Bruno Martins * Copyright (c) 2005 Bruno Martins
* All rights reserved. * All rights reserved.
* *

View File

@ -1295,4 +1295,30 @@ public abstract class BaseDirectoryTestCase extends LuceneTestCase {
assertTrue(Arrays.asList(fsDir.listAll()).contains(fileName)); assertTrue(Arrays.asList(fsDir.listAll()).contains(fileName));
} }
} }
public void testListAllIsSorted() throws IOException {
try (Directory dir = getDirectory(createTempDir())) {
int count = atLeast(20);
Set<String> names = new HashSet<>();
while(names.size() < count) {
String name = TestUtil.randomSimpleString(random());
if (name.length() == 0) {
continue;
}
if (random().nextInt(5) == 1) {
IndexOutput out = dir.createTempOutput(name, "foo", IOContext.DEFAULT);
names.add(out.getName());
out.close();
} else if (names.contains(name) == false) {
IndexOutput out = dir.createOutput(name, IOContext.DEFAULT);
names.add(out.getName());
out.close();
}
}
String[] actual = dir.listAll();
String[] expected = actual.clone();
Arrays.sort(expected);
assertEquals(expected, actual);
}
}
} }

View File

@ -210,6 +210,7 @@ public abstract class BaseLockFactoryTestCase extends LuceneTestCase {
@Override @Override
public void run() { public void run() {
IndexWriter writer = null; IndexWriter writer = null;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for(int i=0;i<this.numIteration;i++) { for(int i=0;i<this.numIteration;i++) {
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: WriterThread iter=" + i); System.out.println("TEST: WriterThread iter=" + i);
@ -218,13 +219,17 @@ public abstract class BaseLockFactoryTestCase extends LuceneTestCase {
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
// We only print the IW infoStream output on exc, below: // We only print the IW infoStream output on exc, below:
ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream printStream;
try { try {
iwc.setInfoStream(new PrintStreamInfoStream(new PrintStream(baos, true, "UTF8"))); printStream = new PrintStream(baos, true, "UTF8");
} catch (UnsupportedEncodingException uee) { } catch (UnsupportedEncodingException uee) {
// shouldn't happen // shouldn't happen
throw new RuntimeException(uee); throw new RuntimeException(uee);
} }
iwc.setInfoStream(new PrintStreamInfoStream(printStream));
printStream.println("\nTEST: WriterThread iter=" + i);
iwc.setOpenMode(OpenMode.APPEND); iwc.setOpenMode(OpenMode.APPEND);
try { try {
writer = new IndexWriter(dir, iwc); writer = new IndexWriter(dir, iwc);

View File

@ -833,12 +833,12 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
} }
throw new RuntimeException("MockDirectoryWrapper: cannot close: there are still open locks: " + openLocks, cause); throw new RuntimeException("MockDirectoryWrapper: cannot close: there are still open locks: " + openLocks, cause);
} }
if (getCheckIndexOnClose()) {
randomIOExceptionRate = 0.0; randomIOExceptionRate = 0.0;
randomIOExceptionRateOnOpen = 0.0; randomIOExceptionRateOnOpen = 0.0;
if (DirectoryReader.indexExists(this)) { if ((getCheckIndexOnClose() || assertNoUnreferencedFilesOnClose) && DirectoryReader.indexExists(this)) {
if (getCheckIndexOnClose()) {
if (LuceneTestCase.VERBOSE) { if (LuceneTestCase.VERBOSE) {
System.out.println("\nNOTE: MockDirectoryWrapper: now crush"); System.out.println("\nNOTE: MockDirectoryWrapper: now crush");
} }
@ -848,9 +848,9 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
} }
TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true); TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true);
}
// TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles
// nocommit pull this outside of "getCheckIndexOnClose"
if (assertNoUnreferencedFilesOnClose) { if (assertNoUnreferencedFilesOnClose) {
// now look for unreferenced files: discount ones that we tried to delete but could not // now look for unreferenced files: discount ones that we tried to delete but could not
@ -863,9 +863,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
TestUtil.disableVirusChecker(in); TestUtil.disableVirusChecker(in);
new IndexWriter(in, iwc).rollback(); new IndexWriter(in, iwc).rollback();
String[] endFiles = in.listAll();
Set<String> files = new HashSet<>(Arrays.asList(listAll()));
String[] endFiles = files.toArray(new String[0]);
Set<String> startSet = new TreeSet<>(Arrays.asList(startFiles)); Set<String> startSet = new TreeSet<>(Arrays.asList(startFiles));
Set<String> endSet = new TreeSet<>(Arrays.asList(endFiles)); Set<String> endSet = new TreeSet<>(Arrays.asList(endFiles));
@ -899,7 +897,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
extras += "\n\nThese files were added (waaaaaaaaaat!): " + added; extras += "\n\nThese files were added (waaaaaaaaaat!): " + added;
} }
throw new RuntimeException(this + ": unreferenced files: before delete:\n " + Arrays.toString(startFiles) + "\n after delete:\n " + Arrays.toString(endFiles) + extras); throw new RuntimeException("unreferenced files: before delete:\n " + Arrays.toString(startFiles) + "\n after delete:\n " + Arrays.toString(endFiles) + extras);
} }
DirectoryReader ir1 = DirectoryReader.open(this); DirectoryReader ir1 = DirectoryReader.open(this);
@ -912,7 +910,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
assert numDocs1 == numDocs2 : "numDocs changed after opening/closing IW: before=" + numDocs1 + " after=" + numDocs2; assert numDocs1 == numDocs2 : "numDocs changed after opening/closing IW: before=" + numDocs1 + " after=" + numDocs2;
} }
} }
}
success = true; success = true;
} finally { } finally {
if (success) { if (success) {

View File

@ -144,6 +144,8 @@ New Features
* SOLR-8285: Ensure the /export handler works with NULL field values (Joel Bernstein) * SOLR-8285: Ensure the /export handler works with NULL field values (Joel Bernstein)
* SOLR-8502: Improve Solr JDBC Driver to support SQL Clients like DBVisualizer (Kevin Risden, Joel Bernstein)
Bug Fixes Bug Fixes
---------------------- ----------------------
* SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been * SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been
@ -347,6 +349,10 @@ New Features
* SOLR-8586: added index fingerprint, a hash over all versions currently in the index. * SOLR-8586: added index fingerprint, a hash over all versions currently in the index.
PeerSync now uses this to check if replicas are in sync. (yonik) PeerSync now uses this to check if replicas are in sync. (yonik)
* SOLR-8500: Allow the number of threads ConcurrentUpdateSolrClient StreamingSolrClients configurable by a
system property. NOTE: this is an expert option and can result in more often needing to do full index replication
for recovery, the sweet spot for using this is very high volume, leader-only indexing. (Tim Potter, Erick Erickson)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -42,6 +42,8 @@ import java.util.concurrent.ExecutorService;
public class StreamingSolrClients { public class StreamingSolrClients {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1);
private HttpClient httpClient; private HttpClient httpClient;
private Map<String, ConcurrentUpdateSolrClient> solrClients = new HashMap<>(); private Map<String, ConcurrentUpdateSolrClient> solrClients = new HashMap<>();
@ -70,7 +72,7 @@ public class StreamingSolrClients {
// NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered // NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered
// on a greater scale since the current behavior is to only increase the number of connections/Runners when // on a greater scale since the current behavior is to only increase the number of connections/Runners when
// the queue is more than half full. // the queue is more than half full.
client = new ConcurrentUpdateSolrClient(url, httpClient, 100, 1, updateExecutor, true) { client = new ConcurrentUpdateSolrClient(url, httpClient, 100, runnerCount, updateExecutor, true) {
@Override @Override
public void handleError(Throwable ex) { public void handleError(Throwable ex) {
req.trackRequestResult(null, false); req.trackRequestResult(null, false);

View File

@ -37,10 +37,10 @@ public class SolrClientCache implements Serializable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Map<String, SolrClient> solrClients = new HashMap(); private final Map<String, SolrClient> solrClients = new HashMap<>();
public synchronized CloudSolrClient getCloudSolrClient(String zkHost) { public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
CloudSolrClient client = null; CloudSolrClient client;
if (solrClients.containsKey(zkHost)) { if (solrClients.containsKey(zkHost)) {
client = (CloudSolrClient) solrClients.get(zkHost); client = (CloudSolrClient) solrClients.get(zkHost);
} else { } else {
@ -53,7 +53,7 @@ public class SolrClientCache implements Serializable {
} }
public synchronized HttpSolrClient getHttpSolrClient(String host) { public synchronized HttpSolrClient getHttpSolrClient(String host) {
HttpSolrClient client = null; HttpSolrClient client;
if (solrClients.containsKey(host)) { if (solrClients.containsKey(host)) {
client = (HttpSolrClient) solrClients.get(host); client = (HttpSolrClient) solrClients.get(host);
} else { } else {
@ -64,12 +64,11 @@ public class SolrClientCache implements Serializable {
} }
public void close() { public void close() {
Iterator<SolrClient> it = solrClients.values().iterator(); for(Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
while(it.hasNext()) {
try { try {
it.next().close(); entry.getValue().close();
} catch (IOException e) { } catch (IOException e) {
log.error(e.getMessage(), e); log.error("Error closing SolrClient for " + entry.getKey(), e);
} }
} }
} }

View File

@ -45,17 +45,17 @@ class ConnectionImpl implements Connection {
private final String url; private final String url;
private final SolrClientCache solrClientCache = new SolrClientCache(); private final SolrClientCache solrClientCache = new SolrClientCache();
private final CloudSolrClient client; private final CloudSolrClient client;
private final String collection;
private final Properties properties; private final Properties properties;
private final DatabaseMetaData databaseMetaData; private final DatabaseMetaData databaseMetaData;
private final Statement connectionStatement; private final Statement connectionStatement;
private String collection;
private boolean closed; private boolean closed;
private SQLWarning currentWarning; private SQLWarning currentWarning;
ConnectionImpl(String url, String zkHost, String collection, Properties properties) throws SQLException { ConnectionImpl(String url, String zkHost, String collection, Properties properties) throws SQLException {
this.url = url; this.url = url;
this.client = solrClientCache.getCloudSolrClient(zkHost); this.client = this.solrClientCache.getCloudSolrClient(zkHost);
this.collection = collection; this.setSchema(collection);
this.properties = properties; this.properties = properties;
this.connectionStatement = createStatement(); this.connectionStatement = createStatement();
this.databaseMetaData = new DatabaseMetaDataImpl(this, this.connectionStatement); this.databaseMetaData = new DatabaseMetaDataImpl(this, this.connectionStatement);
@ -158,7 +158,7 @@ class ConnectionImpl implements Connection {
@Override @Override
public void setCatalog(String catalog) throws SQLException { public void setCatalog(String catalog) throws SQLException {
throw new UnsupportedOperationException();
} }
@Override @Override
@ -301,7 +301,7 @@ class ConnectionImpl implements Connection {
@Override @Override
public boolean isValid(int timeout) throws SQLException { public boolean isValid(int timeout) throws SQLException {
// check that the connection isn't close and able to connect within the timeout // check that the connection isn't closed and able to connect within the timeout
try { try {
if(!isClosed()) { if(!isClosed()) {
this.client.connect(timeout, TimeUnit.SECONDS); this.client.connect(timeout, TimeUnit.SECONDS);
@ -345,7 +345,7 @@ class ConnectionImpl implements Connection {
@Override @Override
public void setSchema(String schema) throws SQLException { public void setSchema(String schema) throws SQLException {
throw new UnsupportedOperationException(); this.collection = schema;
} }
@Override @Override

View File

@ -16,12 +16,22 @@
*/ */
package org.apache.solr.client.solrj.io.sql; package org.apache.solr.client.solrj.io.sql;
import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.RowIdLifetime; import java.sql.RowIdLifetime;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Set;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.util.SimpleOrderedMap;
class DatabaseMetaDataImpl implements DatabaseMetaData { class DatabaseMetaDataImpl implements DatabaseMetaData {
private final ConnectionImpl connection; private final ConnectionImpl connection;
@ -32,6 +42,21 @@ class DatabaseMetaDataImpl implements DatabaseMetaData {
this.connectionStatement = connectionStatement; this.connectionStatement = connectionStatement;
} }
private int getVersionPart(String version, int part) {
// TODO Is there a better way to do this? Reuse code from elsewhere?
// Gets the parts of the Solr version. If fail then just return 0.
if (version != null) {
try {
String[] versionParts = version.split("\\.", 3);
return Integer.parseInt(versionParts[part]);
} catch (Throwable e) {
return 0;
}
} else {
return 0;
}
}
@Override @Override
public boolean allProceduresAreCallable() throws SQLException { public boolean allProceduresAreCallable() throws SQLException {
return false; return false;
@ -79,33 +104,79 @@ class DatabaseMetaDataImpl implements DatabaseMetaData {
@Override @Override
public String getDatabaseProductName() throws SQLException { public String getDatabaseProductName() throws SQLException {
return null; return "Apache Solr";
} }
@Override @Override
public String getDatabaseProductVersion() throws SQLException { public String getDatabaseProductVersion() throws SQLException {
return null; // Returns the version for the first live node in the Solr cluster.
SolrQuery sysQuery = new SolrQuery();
sysQuery.setRequestHandler("/admin/info/system");
CloudSolrClient cloudSolrClient = this.connection.getClient();
Set<String> liveNodes = cloudSolrClient.getZkStateReader().getClusterState().getLiveNodes();
SolrClient solrClient = null;
for (String node : liveNodes) {
try {
String nodeURL = cloudSolrClient.getZkStateReader().getBaseUrlForNodeName(node);
solrClient = new HttpSolrClient(nodeURL);
QueryResponse rsp = solrClient.query(sysQuery);
return String.valueOf(((SimpleOrderedMap) rsp.getResponse().get("lucene")).get("solr-spec-version"));
} catch (SolrServerException | IOException ignore) {
return "";
} finally {
if (solrClient != null) {
try {
solrClient.close();
} catch (IOException ignore) {
// Don't worry about failing to close the Solr client
}
}
}
}
// If no version found just return empty string
return "";
}
@Override
public int getDatabaseMajorVersion() throws SQLException {
return getVersionPart(this.getDatabaseProductVersion(), 0);
}
@Override
public int getDatabaseMinorVersion() throws SQLException {
return getVersionPart(this.getDatabaseProductVersion(), 1);
} }
@Override @Override
public String getDriverName() throws SQLException { public String getDriverName() throws SQLException {
return null; return this.getClass().getPackage().getSpecificationTitle();
} }
@Override @Override
public String getDriverVersion() throws SQLException { public String getDriverVersion() throws SQLException {
return null; return this.getClass().getPackage().getSpecificationVersion();
} }
@Override @Override
public int getDriverMajorVersion() { public int getDriverMajorVersion() {
try {
return getVersionPart(this.getDriverVersion(), 0);
} catch (SQLException e) {
return 0; return 0;
} }
}
@Override @Override
public int getDriverMinorVersion() { public int getDriverMinorVersion() {
try {
return getVersionPart(this.getDriverVersion(), 1);
} catch (SQLException e) {
return 0; return 0;
} }
}
@Override @Override
public boolean usesLocalFiles() throws SQLException { public boolean usesLocalFiles() throws SQLException {
@ -822,19 +893,9 @@ class DatabaseMetaDataImpl implements DatabaseMetaData {
return 0; return 0;
} }
@Override
public int getDatabaseMajorVersion() throws SQLException {
return 0;
}
@Override
public int getDatabaseMinorVersion() throws SQLException {
return 0;
}
@Override @Override
public int getJDBCMajorVersion() throws SQLException { public int getJDBCMajorVersion() throws SQLException {
return 0; return 4;
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Random; import java.util.Random;
@ -45,6 +46,7 @@ class StatementImpl implements Statement {
private String currentSQL; private String currentSQL;
private ResultSetImpl currentResultSet; private ResultSetImpl currentResultSet;
private SQLWarning currentWarning; private SQLWarning currentWarning;
private int maxRows;
StatementImpl(ConnectionImpl connection) { StatementImpl(ConnectionImpl connection) {
this.connection = connection; this.connection = connection;
@ -58,6 +60,10 @@ class StatementImpl implements Statement {
this.currentResultSet = null; this.currentResultSet = null;
} }
if(maxRows > 0 && !containsLimit(sql)) {
sql = sql + " limit "+Integer.toString(maxRows);
}
closed = false; // If closed reopen so Statement can be reused. closed = false; // If closed reopen so Statement can be reused.
this.currentResultSet = new ResultSetImpl(this, constructStream(sql)); this.currentResultSet = new ResultSetImpl(this, constructStream(sql));
return this.currentResultSet; return this.currentResultSet;
@ -132,12 +138,12 @@ class StatementImpl implements Statement {
@Override @Override
public int getMaxRows() throws SQLException { public int getMaxRows() throws SQLException {
throw new UnsupportedOperationException(); return this.maxRows;
} }
@Override @Override
public void setMaxRows(int max) throws SQLException { public void setMaxRows(int max) throws SQLException {
throw new UnsupportedOperationException(); this.maxRows = max;
} }
@Override @Override
@ -351,4 +357,10 @@ class StatementImpl implements Statement {
public boolean isWrapperFor(Class<?> iface) throws SQLException { public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
private boolean containsLimit(String sql) {
String[] tokens = sql.split("\\s+");
String secondToLastToken = tokens[tokens.length-2];
return ("limit").equals(secondToLastToken.toLowerCase(Locale.getDefault()));
}
} }

View File

@ -166,7 +166,8 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
stmt.close(); stmt.close();
//Test statement reuse //Test statement reuse
rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc limit 2"); stmt.setMaxRows(2);
rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc");
assert(rs.next()); assert(rs.next());
assert(rs.getLong("a_i") == 0); assert(rs.getLong("a_i") == 0);
assert(rs.getLong(2) == 0); assert(rs.getLong(2) == 0);
@ -176,8 +177,8 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
assert(!rs.next()); assert(!rs.next());
stmt.close(); stmt.close();
//Test simple loop //Test simple loop. Since limit is set it will override the statement maxRows.
rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc limit 100"); rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc LIMIT 100");
int count = 0; int count = 0;
while(rs.next()) { while(rs.next()) {
++count; ++count;
@ -381,7 +382,13 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
private void testJDBCMethods(String collection, String connectionString, Properties properties, String sql) throws Exception { private void testJDBCMethods(String collection, String connectionString, Properties properties, String sql) throws Exception {
try (Connection con = DriverManager.getConnection(connectionString, properties)) { try (Connection con = DriverManager.getConnection(connectionString, properties)) {
assertTrue(con.isValid(DEFAULT_CONNECTION_TIMEOUT)); assertTrue(con.isValid(DEFAULT_CONNECTION_TIMEOUT));
assertEquals(zkServer.getZkAddress(), con.getCatalog()); assertEquals(zkServer.getZkAddress(), con.getCatalog());
con.setCatalog(zkServer.getZkAddress());
assertEquals(zkServer.getZkAddress(), con.getCatalog());
assertEquals(collection, con.getSchema());
con.setSchema(collection);
assertEquals(collection, con.getSchema()); assertEquals(collection, con.getSchema());
DatabaseMetaData databaseMetaData = con.getMetaData(); DatabaseMetaData databaseMetaData = con.getMetaData();
@ -390,6 +397,21 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
assertEquals(con, databaseMetaData.getConnection()); assertEquals(con, databaseMetaData.getConnection());
assertEquals(connectionString, databaseMetaData.getURL()); assertEquals(connectionString, databaseMetaData.getURL());
assertEquals(4, databaseMetaData.getJDBCMajorVersion());
assertEquals(0, databaseMetaData.getJDBCMinorVersion());
assertEquals("Apache Solr", databaseMetaData.getDatabaseProductName());
// The following tests require package information that is not available when running via Maven
// assertEquals(this.getClass().getPackage().getSpecificationVersion(), databaseMetaData.getDatabaseProductVersion());
// assertEquals(0, databaseMetaData.getDatabaseMajorVersion());
// assertEquals(0, databaseMetaData.getDatabaseMinorVersion());
// assertEquals(this.getClass().getPackage().getSpecificationTitle(), databaseMetaData.getDriverName());
// assertEquals(this.getClass().getPackage().getSpecificationVersion(), databaseMetaData.getDriverVersion());
// assertEquals(0, databaseMetaData.getDriverMajorVersion());
// assertEquals(0, databaseMetaData.getDriverMinorVersion());
try(ResultSet rs = databaseMetaData.getCatalogs()) { try(ResultSet rs = databaseMetaData.getCatalogs()) {
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(zkServer.getZkAddress(), rs.getString("TABLE_CAT")); assertEquals(zkServer.getZkAddress(), rs.getString("TABLE_CAT"));