have bytebuffer directory provide readonly version of files when done writing to them

This commit is contained in:
Shay Banon 2012-02-22 20:56:58 +02:00
parent 0ef2afb855
commit 7d18304fc7
4 changed files with 108 additions and 28 deletions

View File

@ -133,7 +133,7 @@ public class ByteBufferDirectory extends Directory {
ByteBufferFile file = files.remove(name); ByteBufferFile file = files.remove(name);
if (file == null) if (file == null)
throw new FileNotFoundException(name); throw new FileNotFoundException(name);
sizeInBytes.addAndGet(-file.sizeInBytes.get()); sizeInBytes.addAndGet(-file.sizeInBytes());
file.delete(); file.delete();
} }
@ -151,13 +151,18 @@ public class ByteBufferDirectory extends Directory {
if (name.contains("segments") || name.endsWith(".del")) { if (name.contains("segments") || name.endsWith(".del")) {
allocatorType = ByteBufferAllocator.Type.SMALL; allocatorType = ByteBufferAllocator.Type.SMALL;
} }
ByteBufferFile file = new ByteBufferFile(this, allocator.sizeInBytes(allocatorType)); ByteBufferFileOutput file = new ByteBufferFileOutput(this, allocator.sizeInBytes(allocatorType));
ByteBufferFile existing = files.put(name, file); ByteBufferFile existing = files.put(name, file);
if (existing != null) { if (existing != null) {
sizeInBytes.addAndGet(-existing.sizeInBytes.get()); sizeInBytes.addAndGet(-existing.sizeInBytes());
existing.delete(); existing.delete();
} }
return new ByteBufferIndexOutput(name, allocator, allocatorType, file); return new ByteBufferIndexOutput(this, name, allocator, allocatorType, file);
}
void closeOutput(String name, ByteBufferFileOutput file) {
// we replace the output file with a read only file, with no sync
files.put(name, new ByteBufferFile(file));
} }
@Override @Override

View File

@ -18,60 +18,66 @@ package org.apache.lucene.store.bytebuffer;
*/ */
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
public class ByteBufferFile { public class ByteBufferFile {
private final CopyOnWriteArrayList<ByteBuffer> buffers = new CopyOnWriteArrayList<ByteBuffer>(); final ByteBufferDirectory dir;
private final ByteBufferDirectory dir;
final int bufferSize; final int bufferSize;
private volatile long length; final List<ByteBuffer> buffers;
// This is publicly modifiable via Directory.touchFile(), so direct access not supported
private volatile long lastModified = System.currentTimeMillis();
private final AtomicInteger refCount = new AtomicInteger(1); long length;
final AtomicLong sizeInBytes = new AtomicLong(); volatile long lastModified = System.currentTimeMillis();
final AtomicInteger refCount;
long sizeInBytes;
public ByteBufferFile(ByteBufferDirectory dir, int bufferSize) { public ByteBufferFile(ByteBufferDirectory dir, int bufferSize) {
this.dir = dir; this.dir = dir;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.buffers = new ArrayList<ByteBuffer>();
this.refCount = new AtomicInteger(1);
}
ByteBufferFile(ByteBufferFile file) {
this.dir = file.dir;
this.bufferSize = file.bufferSize;
this.buffers = file.buffers;
this.length = file.length;
this.lastModified = file.lastModified;
this.refCount = file.refCount;
this.sizeInBytes = file.sizeInBytes;
} }
// For non-stream access from thread that might be concurrent with writing
public long getLength() { public long getLength() {
return length; return length;
} }
protected void setLength(long length) {
this.length = length;
}
// For non-stream access from thread that might be concurrent with writing
public long getLastModified() { public long getLastModified() {
return lastModified; return lastModified;
} }
protected void setLastModified(long lastModified) { void setLastModified(long lastModified) {
this.lastModified = lastModified; this.lastModified = lastModified;
} }
protected final void addBuffer(ByteBuffer buffer) { long sizeInBytes() {
buffers.add(buffer); return sizeInBytes;
sizeInBytes.addAndGet(buffer.remaining());
dir.sizeInBytes.addAndGet(buffer.remaining());
} }
protected final ByteBuffer getBuffer(int index) { ByteBuffer getBuffer(int index) {
return buffers.get(index); return buffers.get(index);
} }
protected final int numBuffers() { int numBuffers() {
return buffers.size(); return buffers.size();
} }
@ -90,6 +96,7 @@ public class ByteBufferFile {
dir.releaseBuffer(buffer); dir.releaseBuffer(buffer);
} }
buffers.clear(); buffers.clear();
sizeInBytes = 0;
} }
} }
} }

View File

@ -0,0 +1,65 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.nio.ByteBuffer;
/**
*/
public class ByteBufferFileOutput extends ByteBufferFile {
public ByteBufferFileOutput(ByteBufferDirectory dir, int bufferSize) {
super(dir, bufferSize);
}
@Override
public synchronized long getLength() {
return super.getLength();
}
@Override
public synchronized long getLastModified() {
return super.getLastModified();
}
synchronized void setLength(long length) {
this.length = length;
}
synchronized final void addBuffer(ByteBuffer buffer) {
buffers.add(buffer);
sizeInBytes += buffer.remaining();
dir.sizeInBytes.addAndGet(buffer.remaining());
}
@Override
synchronized ByteBuffer getBuffer(int index) {
return super.getBuffer(index);
}
@Override
synchronized int numBuffers() {
return super.numBuffers();
}
@Override
synchronized long sizeInBytes() {
return super.sizeInBytes();
}
}

View File

@ -29,18 +29,20 @@ public class ByteBufferIndexOutput extends IndexOutput {
private final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer(); private final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer();
private final ByteBufferDirectory dir;
private final String name; private final String name;
private final ByteBufferAllocator allocator; private final ByteBufferAllocator allocator;
private final ByteBufferAllocator.Type allocatorType; private final ByteBufferAllocator.Type allocatorType;
private final int BUFFER_SIZE; private final int BUFFER_SIZE;
private final ByteBufferFile file; private final ByteBufferFileOutput file;
private ByteBuffer currentBuffer; private ByteBuffer currentBuffer;
private int currentBufferIndex; private int currentBufferIndex;
private long bufferStart; private long bufferStart;
public ByteBufferIndexOutput(String name, ByteBufferAllocator allocator, ByteBufferAllocator.Type allocatorType, ByteBufferFile file) throws IOException { public ByteBufferIndexOutput(ByteBufferDirectory dir, String name, ByteBufferAllocator allocator, ByteBufferAllocator.Type allocatorType, ByteBufferFileOutput file) throws IOException {
this.dir = dir;
this.name = name; this.name = name;
this.allocator = allocator; this.allocator = allocator;
this.allocatorType = allocatorType; this.allocatorType = allocatorType;
@ -54,6 +56,7 @@ public class ByteBufferIndexOutput extends IndexOutput {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
flush(); flush();
dir.closeOutput(name, file);
} }
@Override @Override