ignore parsing of wrong file format in the gateway (log warn), use index vesrion as the translog id

This commit is contained in:
kimchy 2010-08-24 23:12:18 +03:00
parent d0f5bc3403
commit fdc82dd5a3
5 changed files with 36 additions and 14 deletions

View File

@ -136,7 +136,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
try {
CommitPoint commitPoint = indexGateway.findCommitPoint(shard.id());
if (logger.isDebugEnabled()) {
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n");
sb.append(" gateway_files:\n");
for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
@ -146,7 +146,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
for (StoreFileMetaData md : storeFilesMetaData) {
sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.length())).append("]\n");
}
logger.debug(sb.toString());
logger.trace(sb.toString());
}
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {

View File

@ -154,7 +154,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
try {
translog.newTranslog();
translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
this.nrtResource = buildNrtResource(indexWriter);
} catch (IOException e) {
try {
@ -346,7 +346,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
nrtResource = buildNrtResource(indexWriter);
current.markForClose();
translog.newTranslog();
translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
} finally {
@ -355,7 +355,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
} else {
try {
indexWriter.commit();
translog.newTranslog();
translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
}

View File

@ -243,7 +243,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
expectedNumberOfOperations = translogSnapshot.totalOperations();
}
} else {
// if we have a commit point, check that we have all the files listed in it
// if we have a commit point, check that we have all the files listed in it in the blob store
if (!commitPoints.commits().isEmpty()) {
CommitPoint commitPoint = commitPoints.commits().get(0);
boolean allTranslogFilesExists = true;
@ -749,11 +749,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
private void writeCommitPoint(CommitPoint commitPoint) throws Exception {
byte[] data = CommitPoints.toXContent(commitPoint);
blobContainer.writeBlob("commit-" + commitPoint.version(), new FastByteArrayInputStream(data), data.length);
}
private boolean commitPointExistsInBlobs(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) {
for (CommitPoint.FileInfo fileInfo : Iterables.concat(commitPoint.indexFiles(), commitPoint.translogFiles())) {
if (!commitPointFileExistsInBlobs(fileInfo, blobs)) {
@ -818,10 +813,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
name = name.substring(0, name.indexOf(".part"));
}
try {
long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX);
if (currentGen > generation) {
generation = currentGen;
}
} catch (NumberFormatException e) {
logger.warn("file [{}] does not conform to the '__' schema");
}
}
return generation;
}

View File

@ -64,6 +64,12 @@ public interface Translog extends IndexShardComponent {
*/
void newTranslog() throws TranslogException;
/**
* Creates a new transaction log internally. Note, users of this class should make
* sure that no operations are performed on the trans log when this is called.
*/
void newTranslog(long id) throws TranslogException;
/**
* Adds a create operation to the transaction log.
*/

View File

@ -104,6 +104,23 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
}
@Override public void newTranslog(long id) throws TranslogException {
synchronized (mutex) {
operationCounter.set(0);
lastPosition = 0;
this.id = id;
if (raf != null) {
raf.decreaseRefCount();
}
try {
raf = new RafReference(new File(location, "translog-" + id));
} catch (FileNotFoundException e) {
raf = null;
throw new TranslogException(shardId, "translog not found", e);
}
}
}
@Override public void add(Operation operation) throws TranslogException {
try {
BytesStreamOutput out = CachedStreamOutput.cachedBytes();