implement a simple file system based translog, not the defeault yet, should provide better memory management

This commit is contained in:
kimchy 2010-07-07 09:30:07 +03:00
parent b2bdb149f9
commit 2b5458daf6
8 changed files with 391 additions and 7 deletions

View File

@ -24,7 +24,10 @@ import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores;
import org.elasticsearch.common.io.FileSystemUtils;
import java.io.*;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
/**
* @author kimchy (shay.banon)
@ -42,7 +45,9 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "rw");
} catch (FileNotFoundException e) {
// clean the file if it exists
raf.setLength(0);
} catch (Exception e) {
listener.onFailure(e);
return;
}

View File

@ -61,7 +61,7 @@ public interface Translog extends IndexShardComponent {
* Creates a new transaction log internally. Note, users of this class should make
* sure that no operations are performed on the trans log when this is called.
*/
void newTranslog(long id);
void newTranslog(long id) throws TranslogException;
/**
* Adds a create operation to the transaction log.
@ -107,7 +107,7 @@ public interface Translog extends IndexShardComponent {
Operation next();
void seekForward(long position);
void seekForward(long length);
}
/**

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class FsSnapshot implements Translog.Snapshot {
private final ShardId shardId;
private final long id;
private final RafReference raf;
private final long length;
private final DataInputStream dis;
private Translog.Operation lastOperationRead = null;
private int position = 0;
public FsSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException {
this.shardId = shardId;
this.id = id;
this.raf = raf;
this.length = length;
this.dis = new DataInputStream(new FileInputStream(raf.file()));
}
@Override public long translogId() {
return this.id;
}
@Override public long position() {
return this.position;
}
@Override public long length() {
return this.length;
}
@Override public boolean hasNext() {
try {
if (position > length) {
return false;
}
int opSize = dis.readInt();
position += 4;
if ((position + opSize) > length) {
return false;
}
position += opSize;
byte[] data = new byte[opSize];
dis.readFully(data);
lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(data));
return true;
} catch (Exception e) {
return false;
}
}
@Override public Translog.Operation next() {
return this.lastOperationRead;
}
@Override public void seekForward(long length) {
this.position += length;
try {
this.dis.skip(length);
} catch (IOException e) {
throw new TranslogException(shardId, "failed to seek forward", e);
}
}
@Override public boolean release() throws ElasticSearchException {
try {
dis.close();
} catch (IOException e) {
// ignore
}
raf.decreaseRefCount();
return true;
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kimchy (shay.banon)
*/
public class FsTranslog extends AbstractIndexShardComponent implements Translog {
private final File location;
private final Object mutex = new Object();
private volatile long id;
private final AtomicInteger operationCounter = new AtomicInteger();
private RafReference raf;
private volatile SoftReference<FastByteArrayOutputStream> cachedBos = new SoftReference<FastByteArrayOutputStream>(new FastByteArrayOutputStream());
@Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
this(shardId, indexSettings, new File(new File(new File(new File(nodeEnv.nodeFile(), "indices"), shardId.index().name()), Integer.toString(shardId.id())), "translog"));
}
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) {
super(shardId, indexSettings);
this.location = location;
location.mkdirs();
}
@Override public long currentId() {
return this.id;
}
@Override public int size() {
return operationCounter.get();
}
@Override public ByteSizeValue estimateMemorySize() {
return new ByteSizeValue(0, ByteSizeUnit.BYTES);
}
@Override public void newTranslog(long id) throws TranslogException {
synchronized (mutex) {
operationCounter.set(0);
this.id = id;
if (raf != null) {
raf.decreaseRefCount();
}
try {
raf = new RafReference(new File(location, "translog-" + id));
} catch (FileNotFoundException e) {
raf = null;
throw new TranslogException(shardId, "translog not found", e);
}
}
}
@Override public void add(Operation operation) throws TranslogException {
synchronized (mutex) {
FastByteArrayOutputStream bos = cachedBos.get();
if (bos == null) {
bos = new FastByteArrayOutputStream();
cachedBos = new SoftReference<FastByteArrayOutputStream>(bos);
}
try {
bos.reset();
OutputStreamStreamOutput bosOs = new OutputStreamStreamOutput(bos);
TranslogStreams.writeTranslogOperation(bosOs, operation);
bosOs.flush();
raf.raf().writeInt(bos.size());
raf.raf().write(bos.unsafeByteArray(), 0, bos.size());
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
}
}
}
@Override public Snapshot snapshot() throws TranslogException {
synchronized (mutex) {
try {
raf.increaseRefCount();
return new FsSnapshot(shardId, this.id, raf, raf.raf().getFilePointer());
} catch (IOException e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
}
}
}
@Override public Snapshot snapshot(Snapshot snapshot) {
synchronized (mutex) {
if (currentId() != snapshot.translogId()) {
return snapshot();
}
try {
FsSnapshot fsSnapshot = (FsSnapshot) snapshot;
raf.increaseRefCount();
FsSnapshot newSnapshot = new FsSnapshot(shardId, id, raf, raf.raf().getFilePointer());
newSnapshot.seekForward(fsSnapshot.length());
return newSnapshot;
} catch (IOException e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
}
}
}
@Override public void close() {
synchronized (mutex) {
if (raf != null) {
raf.decreaseRefCount();
raf = null;
}
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kimchy (shay.banon)
*/
public class RafReference {
private final File file;
private final RandomAccessFile raf;
private final AtomicInteger refCount = new AtomicInteger();
public RafReference(File file) throws FileNotFoundException {
this.file = file;
this.raf = new RandomAccessFile(file, "rw");
this.refCount.incrementAndGet();
}
public File file() {
return this.file;
}
public RandomAccessFile raf() {
return this.raf;
}
public void increaseRefCount() {
refCount.incrementAndGet();
}
public void decreaseRefCount() {
if (refCount.decrementAndGet() <= 0) {
try {
raf.close();
file.delete();
} catch (IOException e) {
// ignore
}
}
}
}

View File

@ -70,11 +70,11 @@ public class MemorySnapshot implements Translog.Snapshot {
return operation;
}
@Override public void seekForward(long position) {
long numberToSeek = this.position + position;
@Override public void seekForward(long length) {
long numberToSeek = this.position + length;
while (numberToSeek-- != 0) {
operationsIt.next();
}
this.position = position;
this.position += length;
}
}

View File

@ -118,6 +118,7 @@ public abstract class AbstractSimpleTranslogTests {
snapshot.release();
snapshot = translog.snapshot();
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.source(), equalTo(new byte[]{1}));
snapshot.release();

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.index.translog.AbstractSimpleTranslogTests;
import org.elasticsearch.index.translog.Translog;
import org.testng.annotations.AfterTest;
import java.io.File;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (shay.banon)
*/
public class FsSimpleTranslogTests extends AbstractSimpleTranslogTests {
@Override protected Translog create() {
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"));
}
@AfterTest public void cleanup() {
FileSystemUtils.deleteRecursively(new File("work/fs-translog"), true);
}
}