diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index 76c0bbf8bf9..2c4655bc143 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -136,7 +136,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { try { CommitPoint commitPoint = indexGateway.findCommitPoint(shard.id()); - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n"); sb.append(" gateway_files:\n"); for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) { @@ -146,7 +146,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { for (StoreFileMetaData md : storeFilesMetaData) { sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.length())).append("]\n"); } - logger.debug(sb.toString()); + logger.trace(sb.toString()); } long sizeMatched = 0; for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index b5ea73a9066..9ee97656e48 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -154,7 +154,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } try { - translog.newTranslog(); + translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); this.nrtResource = buildNrtResource(indexWriter); } catch (IOException e) { try { @@ -346,7 +346,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, AcquirableResource current = nrtResource; nrtResource = buildNrtResource(indexWriter); current.markForClose(); - translog.newTranslog(); + translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); } catch (IOException e) { throw new FlushFailedEngineException(shardId, e); } finally { @@ -355,7 +355,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } else { try { indexWriter.commit(); - translog.newTranslog(); + translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); } catch (IOException e) { throw new FlushFailedEngineException(shardId, e); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index 68b2226c626..83dd4252da9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -243,7 +243,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo expectedNumberOfOperations = translogSnapshot.totalOperations(); } } else { - // if we have a commit point, check that we have all the files listed in it + // if we have a commit point, check that we have all the files listed in it in the blob store if (!commitPoints.commits().isEmpty()) { CommitPoint commitPoint = commitPoints.commits().get(0); boolean allTranslogFilesExists = true; @@ -749,11 +749,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } } - private void writeCommitPoint(CommitPoint commitPoint) throws Exception { - byte[] data = CommitPoints.toXContent(commitPoint); - blobContainer.writeBlob("commit-" + commitPoint.version(), new FastByteArrayInputStream(data), data.length); - } - private boolean commitPointExistsInBlobs(CommitPoint commitPoint, ImmutableMap blobs) { for (CommitPoint.FileInfo fileInfo : Iterables.concat(commitPoint.indexFiles(), commitPoint.translogFiles())) { if (!commitPointFileExistsInBlobs(fileInfo, blobs)) { @@ -818,9 +813,13 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo name = name.substring(0, name.indexOf(".part")); } - long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX); - if (currentGen > generation) { - generation = currentGen; + try { + long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX); + if (currentGen > generation) { + generation = currentGen; + } + } catch (NumberFormatException e) { + logger.warn("file [{}] does not conform to the '__' schema"); } } return generation; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 49b7e763cbb..02815fff8d0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -64,6 +64,12 @@ public interface Translog extends IndexShardComponent { */ void newTranslog() throws TranslogException; + /** + * Creates a new transaction log internally. Note, users of this class should make + * sure that no operations are performed on the trans log when this is called. + */ + void newTranslog(long id) throws TranslogException; + /** * Adds a create operation to the transaction log. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 9c4e7213df3..03d951dc8b8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -104,6 +104,23 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } + @Override public void newTranslog(long id) throws TranslogException { + synchronized (mutex) { + operationCounter.set(0); + lastPosition = 0; + this.id = id; + if (raf != null) { + raf.decreaseRefCount(); + } + try { + raf = new RafReference(new File(location, "translog-" + id)); + } catch (FileNotFoundException e) { + raf = null; + throw new TranslogException(shardId, "translog not found", e); + } + } + } + @Override public void add(Operation operation) throws TranslogException { try { BytesStreamOutput out = CachedStreamOutput.cachedBytes();