Merge pull request #16027 from nik9000/shard_core_key_map_closed

Handle closed readers in ShardCoreKeyMap
This commit is contained in:
Nik Everett 2016-01-18 09:16:10 -05:00
commit bfcffa2a00
2 changed files with 38 additions and 2 deletions

View File

@ -20,9 +20,11 @@
package org.elasticsearch.common.lucene; package org.elasticsearch.common.lucene;
import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.ShardUtils;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -72,7 +74,7 @@ public final class ShardCoreKeyMap {
} }
final boolean added = objects.add(coreKey); final boolean added = objects.add(coreKey);
assert added; assert added;
reader.addCoreClosedListener(ownerCoreCacheKey -> { CoreClosedListener listener = ownerCoreCacheKey -> {
assert coreKey == ownerCoreCacheKey; assert coreKey == ownerCoreCacheKey;
synchronized (ShardCoreKeyMap.this) { synchronized (ShardCoreKeyMap.this) {
coreKeyToShard.remove(ownerCoreCacheKey); coreKeyToShard.remove(ownerCoreCacheKey);
@ -83,7 +85,20 @@ public final class ShardCoreKeyMap {
indexToCoreKey.remove(index); indexToCoreKey.remove(index);
} }
} }
}); };
boolean addedListener = false;
try {
reader.addCoreClosedListener(listener);
addedListener = true;
} finally {
if (false == addedListener) {
try {
listener.onClose(coreKey);
} catch (IOException e) {
throw new RuntimeException("Blow up trying to recover from failure to add listener", e);
}
}
}
} }
} }
} }

View File

@ -22,8 +22,10 @@ package org.elasticsearch.common.lucene;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -55,6 +57,25 @@ public class ShardCoreKeyMapTests extends ESTestCase {
} }
} }
public void testAddingAClosedReader() throws Exception {
LeafReader reader;
try (Directory dir = newDirectory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {
writer.addDocument(new Document());
try (DirectoryReader dirReader = ElasticsearchDirectoryReader.wrap(writer.getReader(), new ShardId("index1", 1))) {
reader = dirReader.leaves().get(0).reader();
}
}
ShardCoreKeyMap map = new ShardCoreKeyMap();
try {
map.add(reader);
fail("Expected AlreadyClosedException");
} catch (AlreadyClosedException e) {
// What we wanted
}
assertEquals(0, map.size());
}
public void testBasics() throws IOException { public void testBasics() throws IOException {
Directory dir1 = newDirectory(); Directory dir1 = newDirectory();
RandomIndexWriter w1 = new RandomIndexWriter(random(), dir1); RandomIndexWriter w1 = new RandomIndexWriter(random(), dir1);