diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 35312ff27ca..ba258450a93 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -2,6 +2,7 @@ addr + appendable args asciifolding attr diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java new file mode 100644 index 00000000000..b22c2f88d15 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore; + +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public interface AppendableBlobContainer extends BlobContainer { + + interface AppendBlobListener { + void withStream(StreamOutput os) throws IOException; + + void onCompleted(); + + void onFailure(Throwable t); + } + + interface AppendableBlob { + + void append(AppendBlobListener listener); + + void close(); + } + + AppendableBlob appendBlob(String blobName, boolean append) throws IOException; +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java new file mode 100644 index 00000000000..621751e2b89 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore; + +import org.elasticsearch.common.collect.ImmutableMap; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public interface BlobContainer { + + interface BlobNameFilter { + /** + * Return false if the blob should be filtered. + */ + boolean accept(String blobName); + } + + interface ReadBlobListener { + + void onPartial(byte[] data, int offset, int size) throws IOException; + + void onCompleted(); + + void onFailure(Throwable t); + } + + BlobPath path(); + + void readBlob(String blobName, ReadBlobListener listener); + + byte[] readBlobFully(String blobName) throws IOException; + + boolean deleteBlob(String blobName) throws IOException; + + void deleteBlobsByPrefix(String blobNamePrefix) throws IOException; + + void deleteBlobsByFilter(BlobNameFilter filter) throws IOException; + + ImmutableMap listBlobs() throws IOException; + + ImmutableMap listBlobsByPrefix(String blobNamePrefix) throws IOException; +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobMetaData.java new file mode 100644 index 00000000000..f934129ff45 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobMetaData.java @@ -0,0 +1,30 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore; + +/** + * @author kimchy (shay.banon) + */ +public interface BlobMetaData { + + String name(); + + long sizeInBytes(); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java new file mode 100644 index 00000000000..553de1712a7 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore; + +import org.elasticsearch.common.collect.ImmutableList; + +import java.util.Iterator; + +/** + * @author kimchy (shay.banon) + */ +public class BlobPath implements Iterable { + + private final ImmutableList paths; + + public BlobPath() { + this.paths = ImmutableList.of(); + } + + public static BlobPath cleanPath() { + return new BlobPath(); + } + + private BlobPath(ImmutableList paths) { + this.paths = paths; + } + + @Override public Iterator iterator() { + return paths.iterator(); + } + + public String[] toArray() { + return paths.toArray(new String[paths.size()]); + } + + public BlobPath add(String path) { + ImmutableList.Builder builder = ImmutableList.builder(); + return new BlobPath(builder.addAll(paths).add(path).build()); + } + + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + for (String path : paths) { + sb.append('[').append(path).append(']'); + } + return sb.toString(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java new file mode 100644 index 00000000000..0b326d1860c --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java @@ -0,0 +1,15 @@ +package org.elasticsearch.common.blobstore; + +/** + * @author kimchy (shay.banon) + */ +public interface BlobStore { + + ImmutableBlobContainer immutableBlobContainer(BlobPath path); + + AppendableBlobContainer appendableBlobContainer(BlobPath path); + + void delete(BlobPath path); + + void close(); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStoreException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStoreException.java new file mode 100644 index 00000000000..f100f356f66 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStoreException.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore; + +import org.elasticsearch.ElasticSearchException; + +/** + * @author kimchy (shay.banon) + */ +public class BlobStoreException extends ElasticSearchException { + + public BlobStoreException(String msg) { + super(msg); + } + + public BlobStoreException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/ImmutableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/ImmutableBlobContainer.java new file mode 100644 index 00000000000..63b0fa70cec --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/ImmutableBlobContainer.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore; + +import java.io.IOException; +import java.io.InputStream; + +/** + * @author kimchy (shay.banon) + */ +public interface ImmutableBlobContainer extends BlobContainer { + + interface WriterListener { + void onCompleted(); + + void onFailure(Throwable t); + } + + void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener); + + void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException; +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java new file mode 100644 index 00000000000..5bb58a55281 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.fs; + +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.collect.ImmutableMap; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public abstract class AbstractFsBlobContainer extends AbstractBlobContainer { + + protected final FsBlobStore blobStore; + + protected final File path; + + public AbstractFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) { + super(blobPath); + this.blobStore = blobStore; + this.path = path; + } + + public ImmutableMap listBlobs() throws IOException { + File[] files = path.listFiles(); + if (files == null || files.length == 0) { + return ImmutableMap.of(); + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (File file : files) { + builder.put(file.getName(), new FsBlobMetaData(file)); + } + return builder.build(); + } + + public boolean deleteBlob(String blobName) throws IOException { + return new File(path, blobName).delete(); + } + + @Override public void readBlob(final String blobName, final ReadBlobListener listener) { + blobStore.executorService().execute(new Runnable() { + @Override public void run() { + byte[] buffer = new byte[1024 * 16]; + FileInputStream is; + try { + is = new FileInputStream(new File(path, blobName)); + } catch (FileNotFoundException e) { + listener.onFailure(e); + return; + } + try { + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + listener.onPartial(buffer, 0, bytesRead); + } + listener.onCompleted(); + } catch (Exception e) { + try { + is.close(); + } catch (IOException e1) { + // ignore + } + listener.onFailure(e); + } + } + }); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java new file mode 100644 index 00000000000..300eb3aa2d8 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.fs; + +import org.elasticsearch.common.blobstore.AppendableBlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.io.stream.DataOutputStreamOutput; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +/** + * @author kimchy (shay.banon) + */ +public class FsAppendableBlobContainer extends AbstractFsBlobContainer implements AppendableBlobContainer { + + public FsAppendableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) { + super(blobStore, blobPath, path); + } + + @Override public AppendableBlob appendBlob(String blobName, boolean append) throws IOException { + return new FsAppendableBlob(new File(path, blobName), append); + } + + private class FsAppendableBlob implements AppendableBlob { + + private final File file; + + public FsAppendableBlob(File file, boolean append) throws IOException { + this.file = file; + if (!append) { + RandomAccessFile raf = new RandomAccessFile(file, "rw"); + raf.setLength(0); + raf.close(); + } + } + + @Override public void append(final AppendBlobListener listener) { + blobStore.executorService().execute(new Runnable() { + @Override public void run() { + RandomAccessFile raf = null; + try { + raf = new RandomAccessFile(file, "rw"); + raf.seek(raf.length()); + listener.withStream(new DataOutputStreamOutput(raf)); + listener.onCompleted(); + raf.close(); + FileSystemUtils.syncFile(file); + } catch (IOException e) { + listener.onFailure(e); + } finally { + if (raf != null) { + try { + raf.close(); + } catch (IOException e) { + // ignore + } + } + } + } + }); + } + + @Override public void close() { + // nothing to do there + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobMetaData.java new file mode 100644 index 00000000000..7fffa2cee5c --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobMetaData.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.fs; + +import org.elasticsearch.common.blobstore.BlobMetaData; + +import java.io.File; + +/** + * @author kimchy (shay.banon) + */ +public class FsBlobMetaData implements BlobMetaData { + + private final File file; + + public FsBlobMetaData(File file) { + this.file = file; + } + + @Override public String name() { + return file.getName(); + } + + @Override public long sizeInBytes() { + return file.length(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java new file mode 100644 index 00000000000..ec5675c9a1c --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java @@ -0,0 +1,109 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.fs; + +import org.elasticsearch.common.blobstore.*; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.settings.Settings; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*; + +/** + * @author kimchy (shay.banon) + */ +public class FsBlobStore implements BlobStore { + + private final File path; + + private final ExecutorService executorService; + + public FsBlobStore(Settings settings, File path) { + this.path = path; + if (!path.exists()) { + boolean b = path.mkdirs(); + if (!b) { + throw new BlobStoreException("Failed to create directory at [" + path + "]"); + } + } + if (!path.isDirectory()) { + throw new BlobStoreException("Path is not a directory at [" + path + "]"); + } + executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "fs_blobstore")); + } + + @Override public String toString() { + return path.toString(); + } + + public File path() { + return path; + } + + public ExecutorService executorService() { + return executorService; + } + + @Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { + return new FsImmutableBlobContainer(this, path, buildAndCreate(path)); + } + + @Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) { + return new FsAppendableBlobContainer(this, path, buildAndCreate(path)); + } + + @Override public void delete(BlobPath path) { + FileSystemUtils.deleteRecursively(buildPath(path)); + } + + @Override public void close() { + executorService.shutdown(); + try { + executorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignore + } + executorService.shutdownNow(); + } + + private synchronized File buildAndCreate(BlobPath path) { + File f = buildPath(path); + f.mkdirs(); + return f; + } + + private File buildPath(BlobPath path) { + String[] paths = path.toArray(); + if (paths.length == 0) { + return path(); + } + File blobPath = new File(this.path, paths[0]); + if (paths.length > 1) { + for (int i = 1; i < paths.length; i++) { + blobPath = new File(blobPath, paths[i]); + } + } + return blobPath; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java new file mode 100644 index 00000000000..e23000cb96d --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsImmutableBlobContainer.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.fs; + +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.blobstore.support.BlobStores; +import org.elasticsearch.common.io.FileSystemUtils; + +import java.io.*; + +/** + * @author kimchy (shay.banon) + */ +public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements ImmutableBlobContainer { + + public FsImmutableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) { + super(blobStore, blobPath, path); + } + + @Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) { + blobStore.executorService().execute(new Runnable() { + @Override public void run() { + File file = new File(path, blobName); + RandomAccessFile raf = null; + try { + raf = new RandomAccessFile(file, "rw"); + } catch (FileNotFoundException e) { + listener.onFailure(e); + } + try { + try { + byte[] buffer = new byte[16 * 1024]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + raf.write(buffer, 0, bytesRead); + } + } finally { + try { + is.close(); + } catch (IOException ex) { + // do nothing + } + try { + raf.close(); + } catch (IOException ex) { + // do nothing + } + } + FileSystemUtils.syncFile(file); + listener.onCompleted(); + } catch (Exception e) { + // just on the safe size, try and delete it on failure + try { + if (file.exists()) { + file.delete(); + } + } catch (Exception e1) { + // ignore + } + listener.onFailure(e); + } + } + }); + } + + @Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { + BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java new file mode 100644 index 00000000000..9cfebd58949 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.support; + +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.collect.ImmutableMap; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author kimchy (shay.banon) + */ +public abstract class AbstractBlobContainer implements BlobContainer { + + private final BlobPath path; + + protected AbstractBlobContainer(BlobPath path) { + this.path = path; + } + + @Override public BlobPath path() { + return this.path; + } + + @Override public byte[] readBlobFully(String blobName) throws IOException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference(); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + readBlob(blobName, new ReadBlobListener() { + @Override public void onPartial(byte[] data, int offset, int size) { + bos.write(data, offset, size); + } + + @Override public void onCompleted() { + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + failure.set(t); + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted while waiting to read [" + blobName + "]"); + } + + if (failure.get() != null) { + if (failure.get() instanceof IOException) { + throw (IOException) failure.get(); + } else { + throw new IOException("Failed to get [" + blobName + "]", failure.get()); + } + } + return bos.toByteArray(); + } + + @Override public ImmutableMap listBlobsByPrefix(String blobNamePrefix) throws IOException { + ImmutableMap allBlobs = listBlobs(); + ImmutableMap.Builder blobs = ImmutableMap.builder(); + for (BlobMetaData blob : allBlobs.values()) { + if (blob.name().startsWith(blobNamePrefix)) { + blobs.put(blob.name(), blob); + } + } + return blobs.build(); + } + + @Override public void deleteBlobsByPrefix(final String blobNamePrefix) throws IOException { + deleteBlobsByFilter(new BlobNameFilter() { + @Override public boolean accept(String blobName) { + return blobName.startsWith(blobNamePrefix); + } + }); + } + + @Override public void deleteBlobsByFilter(BlobNameFilter filter) throws IOException { + ImmutableMap blobs = listBlobs(); + for (BlobMetaData blob : blobs.values()) { + if (filter.accept(blob.name())) { + deleteBlob(blob.name()); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/BlobStores.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/BlobStores.java new file mode 100644 index 00000000000..d8ba888fe7e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/BlobStores.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.support; + +import org.elasticsearch.common.blobstore.ImmutableBlobContainer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author kimchy (shay.banon) + */ +public class BlobStores { + + public static void syncWriteBlob(ImmutableBlobContainer blobContainer, String blobName, InputStream is, long sizeInBytes) throws IOException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference(); + blobContainer.writeBlob(blobName, is, sizeInBytes, new ImmutableBlobContainer.WriterListener() { + @Override public void onCompleted() { + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + failure.set(t); + latch.countDown(); + } + }); + try { + latch.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted while waiting to write [" + blobName + "]"); + } + + if (failure.get() != null) { + if (failure.get() instanceof IOException) { + throw (IOException) failure.get(); + } else { + throw new IOException("Failed to get [" + blobName + "]", failure.get()); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/PlainBlobMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/PlainBlobMetaData.java new file mode 100644 index 00000000000..7bfd90a1ebb --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/PlainBlobMetaData.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore.support; + +import org.elasticsearch.common.blobstore.BlobMetaData; + +/** + * @author kimchy (shay.banon) + */ +public class PlainBlobMetaData implements BlobMetaData { + + private final String name; + + private final long sizeInBytes; + + public PlainBlobMetaData(String name, long sizeInBytes) { + this.name = name; + this.sizeInBytes = sizeInBytes; + } + + @Override public String name() { + return this.name; + } + + @Override public long sizeInBytes() { + return this.sizeInBytes; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/Streams.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/Streams.java index 70bbd20811d..fdb91ea51f7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/Streams.java @@ -35,7 +35,7 @@ import java.io.*; */ public abstract class Streams { - public static final int BUFFER_SIZE = 4096; + public static final int BUFFER_SIZE = 1024 * 8; //--------------------------------------------------------------------- @@ -50,7 +50,7 @@ public abstract class Streams { * @return the number of bytes copied * @throws IOException in case of I/O errors */ - public static int copy(File in, File out) throws IOException { + public static long copy(File in, File out) throws IOException { Preconditions.checkNotNull(in, "No input File specified"); Preconditions.checkNotNull(out, "No output File specified"); return copy(new BufferedInputStream(new FileInputStream(in)), @@ -98,11 +98,11 @@ public abstract class Streams { * @return the number of bytes copied * @throws IOException in case of I/O errors */ - public static int copy(InputStream in, OutputStream out) throws IOException { + public static long copy(InputStream in, OutputStream out) throws IOException { Preconditions.checkNotNull(in, "No InputStream specified"); Preconditions.checkNotNull(out, "No OutputStream specified"); try { - int byteCount = 0; + long byteCount = 0; byte[] buffer = new byte[BUFFER_SIZE]; int bytesRead; while ((bytesRead = in.read(buffer)) != -1) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java index ee87b8766f0..6ac14a2e606 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java @@ -45,6 +45,10 @@ public class BytesStreamInput extends StreamInput { this.count = count; } + public int position() { + return this.pos; + } + @Override public int read() throws IOException { return (pos < count) ? (buf[pos++] & 0xff) : -1; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java index cc1e0f06097..0843c38e4ab 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -28,6 +28,8 @@ import org.elasticsearch.common.inject.Module; */ public interface Gateway extends LifecycleComponent { + String type(); + void write(MetaData metaData) throws GatewayException; MetaData read() throws GatewayException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java new file mode 100644 index 00000000000..429c2aefb0f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -0,0 +1,184 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.blobstore; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.blobstore.*; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.GatewayException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public abstract class BlobStoreGateway extends AbstractLifecycleComponent implements Gateway { + + private BlobStore blobStore; + + private ByteSizeValue chunkSize; + + private BlobPath basePath; + + private ImmutableBlobContainer metaDataBlobContainer; + + private volatile int currentIndex; + + protected BlobStoreGateway(Settings settings) throws IOException { + super(settings); + } + + protected void initialize(BlobStore blobStore, ClusterName clusterName) throws IOException { + this.blobStore = blobStore; + this.chunkSize = componentSettings.getAsBytesSize("chunk_size", null); + this.basePath = BlobPath.cleanPath().add(clusterName.value()); + this.metaDataBlobContainer = blobStore.immutableBlobContainer(basePath.add("metadata")); + this.currentIndex = findLatestIndex(); + logger.debug("Latest metadata found at index [" + currentIndex + "]"); + } + + @Override public String toString() { + return type() + "://" + blobStore + "/" + basePath; + } + + public BlobStore blobStore() { + return blobStore; + } + + public BlobPath basePath() { + return basePath; + } + + public ByteSizeValue chunkSize() { + return this.chunkSize; + } + + @Override public void reset() throws Exception { + blobStore.delete(BlobPath.cleanPath()); + } + + @Override protected void doStart() throws ElasticSearchException { + } + + @Override protected void doStop() throws ElasticSearchException { + } + + @Override protected void doClose() throws ElasticSearchException { + } + + @Override public MetaData read() throws GatewayException { + try { + this.currentIndex = findLatestIndex(); + } catch (IOException e) { + throw new GatewayException("Failed to find latest metadata to read from", e); + } + if (currentIndex == -1) + return null; + String metaData = "metadata-" + currentIndex; + + try { + return readMetaData(metaDataBlobContainer.readBlobFully(metaData)); + } catch (GatewayException e) { + throw e; + } catch (Exception e) { + throw new GatewayException("Failed to read metadata [" + metaData + "] from gateway", e); + } + } + + @Override public void write(MetaData metaData) throws GatewayException { + final String newMetaData = "metadata-" + (currentIndex + 1); + BinaryXContentBuilder builder; + try { + builder = XContentFactory.contentBinaryBuilder(XContentType.JSON); + builder.prettyPrint(); + builder.startObject(); + MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + } catch (IOException e) { + throw new GatewayException("Failed to serialize metadata into gateway", e); + } + + try { + metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()), builder.unsafeBytesLength()); + } catch (IOException e) { + throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e); + } + + currentIndex++; + + try { + metaDataBlobContainer.deleteBlobsByFilter(new BlobContainer.BlobNameFilter() { + @Override public boolean accept(String blobName) { + return blobName.startsWith("metadata-") && !newMetaData.equals(blobName); + } + }); + } catch (IOException e) { + logger.debug("Failed to delete old metadata, will do it next time", e); + } + } + + private int findLatestIndex() throws IOException { + ImmutableMap blobs = metaDataBlobContainer.listBlobsByPrefix("metadata-"); + + int index = -1; + for (BlobMetaData md : blobs.values()) { + if (logger.isTraceEnabled()) { + logger.trace("[findLatestMetadata]: Processing [" + md.name() + "]"); + } + String name = md.name(); + int fileIndex = Integer.parseInt(name.substring(name.indexOf('-') + 1)); + if (fileIndex >= index) { + // try and read the meta data + try { + readMetaData(metaDataBlobContainer.readBlobFully(name)); + index = fileIndex; + } catch (IOException e) { + logger.warn("[findLatestMetadata]: Failed to read metadata from [" + name + "], ignoring...", e); + } + } + } + + return index; + } + + private MetaData readMetaData(byte[] data) throws IOException { + XContentParser parser = null; + try { + parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + return MetaData.Builder.fromXContent(parser, settings); + } finally { + if (parser != null) { + parser.close(); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java index 58daa265f3c..e61131b7ea2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java @@ -19,207 +19,42 @@ package org.elasticsearch.gateway.fs; -import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder; import org.elasticsearch.env.Environment; -import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.gateway.blobstore.BlobStoreGateway; import org.elasticsearch.index.gateway.fs.FsIndexGatewayModule; -import java.io.*; - -import static org.elasticsearch.common.io.FileSystemUtils.*; +import java.io.File; +import java.io.IOException; /** * @author kimchy (shay.banon) */ -public class FsGateway extends AbstractLifecycleComponent implements Gateway { - - private final Environment environment; - - private final ClusterName clusterName; - - private final String location; - - private final File gatewayHome; - - private volatile int currentIndex; +public class FsGateway extends BlobStoreGateway { @Inject public FsGateway(Settings settings, Environment environment, ClusterName clusterName) throws IOException { super(settings); - this.clusterName = clusterName; - this.environment = environment; - this.location = componentSettings.get("location"); - - this.gatewayHome = createGatewayHome(location, environment, clusterName); - - if (!gatewayHome.exists()) { - throw new IOException("FsGateway location [" + gatewayHome + "] can't be created"); - } - - this.currentIndex = findLatestIndex(gatewayHome); - logger.debug("Latest metadata found at index [" + currentIndex + "]"); - } - - @Override protected void doStart() throws ElasticSearchException { - } - - @Override protected void doStop() throws ElasticSearchException { - } - - @Override protected void doClose() throws ElasticSearchException { - } - - public File gatewayHome() { - return gatewayHome; - } - - private static File createGatewayHome(String location, Environment environment, ClusterName clusterName) { - File f; - if (location != null) { - // if its a custom location, append the cluster name to it just so we have unique - // in case two clusters point to the same location - f = new File(new File(location), clusterName.value()); + File gatewayFile; + String location = componentSettings.get("location"); + if (location == null) { + logger.warn("using local fs location for gateway, should be changed to be a shared location across nodes"); + gatewayFile = new File(environment.workFile(), "gateway"); } else { - // work already includes the cluster name - f = new File(environment.workWithClusterFile(), "gateway"); + gatewayFile = new File(location); } - if (f.exists() && f.isDirectory()) { - return f; - } - boolean result; - for (int i = 0; i < 5; i++) { - result = f.mkdirs(); - if (result) { - break; - } - } - - return f; + initialize(new FsBlobStore(componentSettings, gatewayFile), clusterName); } - @Override public void write(MetaData metaData) throws GatewayException { - try { - final File file = new File(gatewayHome, "metadata-" + (currentIndex + 1)); - for (int i = 0; i < 5; i++) { - if (file.createNewFile()) - break; - } - if (!file.exists()) { - throw new GatewayException("Failed to create new file [" + file + "]"); - } - - BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON); - builder.prettyPrint(); - builder.startObject(); - MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - - FileOutputStream fileStream = new FileOutputStream(file); - fileStream.write(builder.unsafeBytes(), 0, builder.unsafeBytesLength()); - fileStream.close(); - - syncFile(file); - - currentIndex++; - - //delete old files. - File[] oldFiles = gatewayHome.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("metadata-") && !name.equals(file.getName()); - } - }); - - for (File oldFile : oldFiles) { - oldFile.delete(); - } - - } catch (IOException e) { - throw new GatewayException("can't write new metadata file into the gateway", e); - } - } - - @Override public MetaData read() throws GatewayException { - try { - if (currentIndex == -1) - return null; - - File file = new File(gatewayHome, "metadata-" + currentIndex); - if (!file.exists()) { - throw new GatewayException("can't find current metadata file"); - } - return readMetaData(file); - } catch (GatewayException e) { - throw e; - } catch (Exception e) { - throw new GatewayException("can't read metadata file from the gateway", e); - } + @Override public String type() { + return "fs"; } @Override public Class suggestIndexGateway() { return FsIndexGatewayModule.class; } - - @Override public void reset() { - FileSystemUtils.deleteRecursively(gatewayHome, false); - currentIndex = -1; - } - - private int findLatestIndex(File gatewayHome) { - File[] files = gatewayHome.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("metadata-"); - } - }); - - int index = -1; - for (File file : files) { - if (logger.isTraceEnabled()) { - logger.trace("[findLatestMetadata]: Processing file [" + file + "]"); - } - String name = file.getName(); - int fileIndex = Integer.parseInt(name.substring(name.indexOf('-') + 1)); - if (fileIndex >= index) { - // try and read the meta data - try { - readMetaData(file); - index = fileIndex; - } catch (IOException e) { - logger.warn("[findLatestMetadata]: Failed to read metadata from [" + file + "], ignoring...", e); - } - } - } - - return index; - } - - private MetaData readMetaData(File file) throws IOException { - FileInputStream fileStream = new FileInputStream(file); - XContentParser parser = null; - try { - parser = XContentFactory.xContent(XContentType.JSON).createParser(fileStream); - return MetaData.Builder.fromXContent(parser, settings); - } finally { - if (parser != null) { - parser.close(); - } - try { - fileStream.close(); - } catch (Exception e) { - // ignore - } - } - } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGatewayModule.java index d9df36b5347..d49484973b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGatewayModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGatewayModule.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.gateway.Gateway; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class FsGatewayModule extends AbstractModule { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java index c879e669efe..07dab8842bb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java @@ -34,10 +34,16 @@ import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule; */ public class NoneGateway extends AbstractLifecycleComponent implements Gateway { + public static final String TYPE = "none"; + @Inject public NoneGateway(Settings settings) { super(settings); } + @Override public String type() { + return TYPE; + } + @Override protected void doStart() throws ElasticSearchException { } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexGateway.java index 2fce98640f4..d4ef41a6a7a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexGateway.java @@ -27,6 +27,8 @@ import org.elasticsearch.index.IndexComponent; */ public interface IndexGateway extends IndexComponent, CloseableIndexComponent { + String type(); + Class shardGatewayClass(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index 7092dd4190e..ea3f58599ad 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -34,6 +34,8 @@ import static org.elasticsearch.common.unit.TimeValue.*; */ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexComponent { + String type(); + /** * Recovers the state of the shard from the gateway. */ @@ -42,7 +44,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo /** * Snapshots the given shard into the gateway. */ - SnapshotStatus snapshot(Snapshot snapshot); + SnapshotStatus snapshot(Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException; /** * Returns true if this gateway requires scheduling management for snapshot diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java new file mode 100644 index 00000000000..c115129c834 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.blobstore; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.blobstore.BlobStoreGateway; +import org.elasticsearch.gateway.none.NoneGateway; +import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.gateway.IndexGateway; +import org.elasticsearch.index.settings.IndexSettings; + +/** + * @author kimchy (shay.banon) + */ +public abstract class BlobStoreIndexGateway extends AbstractIndexComponent implements IndexGateway { + + private final BlobStoreGateway gateway; + + private final BlobStore blobStore; + + private final BlobPath indexPath; + + protected ByteSizeValue chunkSize; + + protected BlobStoreIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) { + super(index, indexSettings); + + if (gateway.type().equals(NoneGateway.TYPE)) { + logger.warn("index gateway is configured, but no cluster level gateway configured, cluster level metadata will be lost on full shutdown"); + } + + this.gateway = (BlobStoreGateway) gateway; + this.blobStore = this.gateway.blobStore(); + + this.chunkSize = componentSettings.getAsBytesSize("chunk_size", this.gateway.chunkSize()); + + this.indexPath = this.gateway.basePath().add("indices").add(index.name()); + } + + @Override public String toString() { + return type() + "://" + blobStore + "/" + indexPath; + } + + public BlobStore blobStore() { + return blobStore; + } + + public BlobPath indexPath() { + return this.indexPath; + } + + public ByteSizeValue chunkSize() { + return this.chunkSize; + } + + @Override public void close(boolean delete) throws ElasticSearchException { + if (delete) { + blobStore.delete(indexPath); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java new file mode 100644 index 00000000000..b1433543fe7 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -0,0 +1,589 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.blobstore; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.blobstore.*; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.trove.TObjectLongHashMap; +import org.elasticsearch.common.trove.TObjectLongIterator; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.gateway.IndexGateway; +import org.elasticsearch.index.gateway.IndexShardGateway; +import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; +import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.index.translog.TranslogStreams.*; + +/** + * @author kimchy (shay.banon) + */ +public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { + + protected final ThreadPool threadPool; + + protected final InternalIndexShard indexShard; + + protected final Store store; + + protected final RecoveryThrottler recoveryThrottler; + + + protected final ByteSizeValue chunkSize; + + protected final BlobStore blobStore; + + protected final BlobPath shardPath; + + protected final ImmutableBlobContainer indexContainer; + + protected final AppendableBlobContainer translogContainer; + + private volatile AppendableBlobContainer.AppendableBlob translogBlob; + + protected BlobStoreIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway, + IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { + super(shardId, indexSettings); + + this.threadPool = threadPool; + this.indexShard = (InternalIndexShard) indexShard; + this.store = store; + this.recoveryThrottler = recoveryThrottler; + + this.chunkSize = ((BlobStoreIndexGateway) indexGateway).chunkSize(); // can be null -> no chunking + this.blobStore = ((BlobStoreIndexGateway) indexGateway).blobStore(); + this.shardPath = ((BlobStoreIndexGateway) indexGateway).indexPath().add(Integer.toString(shardId.id())); + + this.indexContainer = blobStore.immutableBlobContainer(shardPath.add("index")); + this.translogContainer = blobStore.appendableBlobContainer(shardPath.add("translog")); + } + + @Override public String toString() { + return type() + "://" + blobStore + "/" + shardPath; + } + + @Override public boolean requiresSnapshotScheduling() { + return true; + } + + @Override public void close(boolean delete) throws ElasticSearchException { + if (translogBlob != null) { + translogBlob.close(); + translogBlob = null; + } + if (delete) { + blobStore.delete(shardPath); + } + } + + @Override public SnapshotStatus snapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException { + long totalTimeStart = System.currentTimeMillis(); + boolean indexDirty = false; + + final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit(); + final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot(); + + ImmutableMap indicesBlobs = null; + TObjectLongHashMap combinedIndicesBlobs = null; + + int indexNumberOfFiles = 0; + long indexTotalFilesSize = 0; + long indexTime = 0; + if (snapshot.indexChanged()) { + long time = System.currentTimeMillis(); + indexDirty = true; + + try { + indicesBlobs = indexContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to list indices files from gateway", e); + } + combinedIndicesBlobs = buildCombinedPartsBlobs(indicesBlobs); + + // snapshot into the index + final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length); + final CopyOnWriteArrayList failures = new CopyOnWriteArrayList(); + + for (final String fileName : snapshotIndexCommit.getFiles()) { + // don't copy over the segments file, it will be copied over later on as part of the + // final snapshot phase + if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) { + latch.countDown(); + continue; + } + // if the file exists in the gateway, and has the same length, don't copy it over + long fileSize; + try { + fileSize = snapshotIndexCommit.getDirectory().fileLength(fileName); + } catch (IOException e) { + throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get length on local store", e); + } + if (combinedIndicesBlobs.contains(fileName) && combinedIndicesBlobs.get(fileName) == fileSize) { + latch.countDown(); + continue; + } + + // we are snapshotting the file + + indexNumberOfFiles++; + indexTotalFilesSize += fileSize; + + if (combinedIndicesBlobs.contains(fileName)) { + try { + indexContainer.deleteBlobsByPrefix(fileName); + } catch (IOException e) { + logger.debug("failed to delete [" + fileName + "] before snapshotting, ignoring..."); + } + } + + try { + snapshotFile(snapshotIndexCommit.getDirectory(), fileName, latch, failures); + } catch (IOException e) { + failures.add(e); + latch.countDown(); + } + } + + try { + latch.await(); + } catch (InterruptedException e) { + failures.add(e); + } + if (!failures.isEmpty()) { + throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1)); + } + indexTime = System.currentTimeMillis() - time; + } + + + // handle if snapshot has changed + final AtomicInteger translogNumberOfOperations = new AtomicInteger(); + long translogTime = 0; + + if (snapshot.newTranslogCreated() || snapshot.sameTranslogNewOperations()) { + long time = System.currentTimeMillis(); + + if (snapshot.newTranslogCreated() && translogBlob != null) { + translogBlob.close(); + translogBlob = null; + } + + if (translogBlob == null) { + try { + translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId(), !snapshot.newTranslogCreated()); + } catch (IOException e) { + throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to create translog", e); + } + } + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference(); + translogBlob.append(new AppendableBlobContainer.AppendBlobListener() { + @Override public void withStream(StreamOutput os) throws IOException { + int deltaNumberOfOperations; + Iterable operationsIt; + if (snapshot.newTranslogCreated()) { + deltaNumberOfOperations = translogSnapshot.size(); + operationsIt = translogSnapshot; + } else { + deltaNumberOfOperations = translogSnapshot.size() - snapshot.lastTranslogSize(); + operationsIt = translogSnapshot.skipTo(snapshot.lastTranslogSize()); + } + os.writeInt(deltaNumberOfOperations); + for (Translog.Operation operation : operationsIt) { + writeTranslogOperation(os, operation); + } + translogNumberOfOperations.set(deltaNumberOfOperations); + } + + @Override public void onCompleted() { + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + failure.set(t); + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + failure.set(e); + } + + if (failure.get() != null) { + throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to snapshot translog", failure.get()); + } + + translogTime = System.currentTimeMillis() - time; + } + + // now write the segments file + if (indexDirty) { + try { + indexNumberOfFiles++; + if (indicesBlobs.containsKey(snapshotIndexCommit.getSegmentsFileName())) { + indexContainer.deleteBlob(snapshotIndexCommit.getSegmentsFileName()); + } + indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName()); + long time = System.currentTimeMillis(); + + IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName()); + try { + InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE); + indexContainer.writeBlob(snapshotIndexCommit.getSegmentsFileName(), is, is.actualSizeToRead()); + } finally { + try { + indexInput.close(); + } catch (Exception e) { + // ignore + } + } + indexTime += (System.currentTimeMillis() - time); + } catch (Exception e) { + throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e); + } + } + + // delete the old translog + if (snapshot.newTranslogCreated()) { + try { + translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId()); + } catch (IOException e) { + // ignore + } + } + + // delete old index files + if (indexDirty) { + for (TObjectLongIterator it = combinedIndicesBlobs.iterator(); it.hasNext();) { + it.advance(); + boolean found = false; + for (final String fileName : snapshotIndexCommit.getFiles()) { + if (it.key().equals(fileName)) { + found = true; + break; + } + } + if (!found) { + try { + indexContainer.deleteBlobsByPrefix(it.key()); + } catch (IOException e) { + logger.debug("failed to delete unused index files, will retry later...", e); + } + } + } + } + + return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart), + new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)), + new SnapshotStatus.Translog(translogNumberOfOperations.get(), new TimeValue(translogTime))); + } + + @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { + RecoveryStatus.Index recoveryStatusIndex = recoverIndex(); + RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog(); + return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog); + } + + private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException { + final ImmutableMap blobs; + try { + blobs = translogContainer.listBlobsByPrefix("translog-"); + } catch (IOException e) { + throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e); + } + + List translogIds = Lists.newArrayList(); + for (BlobMetaData blob : blobs.values()) { + long translogId = Long.parseLong(blob.name().substring(blob.name().indexOf('-') + 1)); + translogIds.add(translogId); + } + + if (translogIds.isEmpty()) { + // no recovery file found, start the shard and bail + indexShard.start(); + return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)); + } + + Collections.sort(translogIds, new Comparator() { + @Override public int compare(Long o1, Long o2) { + return (int) (o2 - o1); + } + }); + + // try and recover from the latest translog id down to the first + Exception lastException = null; + for (Long translogId : translogIds) { + try { + ArrayList operations = Lists.newArrayList(); + byte[] translogData = translogContainer.readBlobFully("translog-" + translogId); + BytesStreamInput si = new BytesStreamInput(translogData); + while (true) { + // we recover them in parts, each part container the number of operations, and then the list of them + int numberOfOperations = si.readInt(); + for (int i = 0; i < numberOfOperations; i++) { + operations.add(readTranslogOperation(si)); + } + if (si.position() == translogData.length) { + // we have reached the end of the stream, bail + break; + } + } + indexShard.performRecovery(operations); + return new RecoveryStatus.Translog(indexShard.translog().currentId(), operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES)); + } catch (Exception e) { + lastException = e; + logger.debug("Failed to read translog, will try the next one", e); + } + } + throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", lastException); + } + + private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { + final ImmutableMap blobs; + try { + blobs = indexContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e); + } + TObjectLongHashMap combinedBlobs = buildCombinedPartsBlobs(blobs); + + // filter out only the files that we need to recover, and reuse ones that exists in the store + List filesToRecover = new ArrayList(); + for (TObjectLongIterator it = combinedBlobs.iterator(); it.hasNext();) { + it.advance(); + // if the store has the file, and it has the same length, don't recover it + try { + if (store.directory().fileExists(it.key()) && store.directory().fileLength(it.key()) == it.value()) { + if (logger.isTraceEnabled()) { + logger.trace("not recovering [{}], exists in local store and has same size [{}]", it.key(), new ByteSizeValue(it.value())); + } + } else { + filesToRecover.add(it.key()); + } + } catch (Exception e) { + filesToRecover.add(it.key()); + logger.debug("failed to check local store for existence of [{}]", it.key()); + } + } + + long totalSize = 0; + final AtomicLong throttlingWaitTime = new AtomicLong(); + final CountDownLatch latch = new CountDownLatch(filesToRecover.size()); + final CopyOnWriteArrayList failures = new CopyOnWriteArrayList(); + for (final String fileToRecover : filesToRecover) { + totalSize += combinedBlobs.get(fileToRecover); + if (recoveryThrottler.tryStream(shardId, fileToRecover)) { + // we managed to get a recovery going + recoverFile(fileToRecover, blobs, latch, failures); + } else { + // lets reschedule to do it next time + threadPool.schedule(new Runnable() { + @Override public void run() { + throttlingWaitTime.addAndGet(recoveryThrottler.throttleInterval().millis()); + if (recoveryThrottler.tryStream(shardId, fileToRecover)) { + // we managed to get a recovery going + recoverFile(fileToRecover, blobs, latch, failures); + } else { + threadPool.schedule(this, recoveryThrottler.throttleInterval()); + } + } + }, recoveryThrottler.throttleInterval()); + } + } + + try { + latch.await(); + } catch (InterruptedException e) { + throw new IndexShardGatewayRecoveryException(shardId, "Interrupted while recovering index", e); + } + + if (!failures.isEmpty()) { + throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery index", failures.get(0)); + } + + long version = -1; + try { + if (IndexReader.indexExists(store.directory())) { + version = IndexReader.getCurrentVersion(store.directory()); + } + } catch (IOException e) { + throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); + } + + return new RecoveryStatus.Index(version, filesToRecover.size(), new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get())); + } + + private void recoverFile(final String fileToRecover, final ImmutableMap blobs, final CountDownLatch latch, final List failures) { + final IndexOutput indexOutput; + try { + indexOutput = store.directory().createOutput(fileToRecover); + } catch (IOException e) { + recoveryThrottler.streamDone(shardId, fileToRecover); + failures.add(e); + latch.countDown(); + return; + } + final AtomicInteger partIndex = new AtomicInteger(); + indexContainer.readBlob(fileToRecover, new BlobContainer.ReadBlobListener() { + @Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { + indexOutput.writeBytes(data, offset, size); + } + + @Override public synchronized void onCompleted() { + int part = partIndex.incrementAndGet(); + String partName = fileToRecover + ".part" + part; + if (blobs.containsKey(partName)) { + // continue with the new part + indexContainer.readBlob(partName, this); + } else { + // we are done... + try { + indexOutput.close(); + } catch (IOException e) { + onFailure(e); + } + } + recoveryThrottler.streamDone(shardId, fileToRecover); + latch.countDown(); + } + + @Override public void onFailure(Throwable t) { + recoveryThrottler.streamDone(shardId, fileToRecover); + failures.add(t); + latch.countDown(); + } + }); + } + + private void snapshotFile(Directory dir, String fileName, final CountDownLatch latch, final List failures) throws IOException { + long chunkBytes = Long.MAX_VALUE; + if (chunkSize != null) { + chunkBytes = chunkSize.bytes(); + } + + long totalLength = dir.fileLength(fileName); + long numberOfChunks = totalLength / chunkBytes; + if (totalLength % chunkBytes > 0) { + numberOfChunks++; + } + if (numberOfChunks == 0) { + numberOfChunks++; + } + + final AtomicLong counter = new AtomicLong(numberOfChunks); + for (long i = 0; i < numberOfChunks; i++) { + final long chunkNumber = i; + + IndexInput indexInput = null; + try { + indexInput = dir.openInput(fileName); + indexInput.seek(chunkNumber * chunkBytes); + InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes); + + String blobName = fileName; + if (chunkNumber > 0) { + blobName += ".part" + chunkNumber; + } + + final IndexInput fIndexInput = indexInput; + indexContainer.writeBlob(blobName, is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() { + @Override public void onCompleted() { + try { + fIndexInput.close(); + } catch (IOException e) { + // ignore + } + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + } + + @Override public void onFailure(Throwable t) { + failures.add(t); + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + } + }); + } catch (Exception e) { + if (indexInput != null) { + try { + indexInput.close(); + } catch (IOException e1) { + // ignore + } + } + failures.add(e); + latch.countDown(); + } + } + } + + private TObjectLongHashMap buildCombinedPartsBlobs(ImmutableMap blobs) { + TObjectLongHashMap combinedBlobs = new TObjectLongHashMap(); + for (BlobMetaData blob : blobs.values()) { + String cleanName; + int partIndex = blob.name().indexOf(".part"); + if (partIndex == -1) { + cleanName = blob.name(); + } else { + cleanName = blob.name().substring(0, partIndex); + } + combinedBlobs.adjustOrPutValue(cleanName, blob.sizeInBytes(), blob.sizeInBytes()); + } + return combinedBlobs; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java index a00675af8c0..f978e60a9bd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexGateway.java @@ -19,81 +19,29 @@ package org.elasticsearch.index.gateway.fs; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.Gateway; -import org.elasticsearch.gateway.fs.FsGateway; -import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexException; -import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexShardGateway; +import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; import org.elasticsearch.index.settings.IndexSettings; -import java.io.File; - -import static org.elasticsearch.common.io.FileSystemUtils.*; - /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ -public class FsIndexGateway extends AbstractIndexComponent implements IndexGateway { - - private final String location; - - private File indexGatewayHome; +public class FsIndexGateway extends BlobStoreIndexGateway { @Inject public FsIndexGateway(Index index, @IndexSettings Settings indexSettings, Environment environment, Gateway gateway) { - super(index, indexSettings); + super(index, indexSettings, gateway); + } - String location = componentSettings.get("location"); - if (location == null) { - if (gateway instanceof FsGateway) { - indexGatewayHome = new File(new File(((FsGateway) gateway).gatewayHome(), "indices"), index.name()); - } else { - indexGatewayHome = new File(new File(new File(environment.workWithClusterFile(), "gateway"), "indices"), index.name()); - } - location = Strings.cleanPath(indexGatewayHome.getAbsolutePath()); - } else { - indexGatewayHome = new File(new File(location), index.name()); - } - this.location = location; - - if (!(indexGatewayHome.exists() && indexGatewayHome.isDirectory())) { - boolean result; - for (int i = 0; i < 5; i++) { - result = indexGatewayHome.mkdirs(); - if (result) { - break; - } - } - } - if (!(indexGatewayHome.exists() && indexGatewayHome.isDirectory())) { - throw new IndexException(index, "Failed to create index gateway at [" + indexGatewayHome + "]"); - } + @Override public String type() { + return "fs"; } @Override public Class shardGatewayClass() { return FsIndexShardGateway.class; } - - @Override public void close(boolean delete) { - if (!delete) { - return; - } - try { - String[] files = indexGatewayHome.list(); - if (files == null || files.length == 0) { - deleteRecursively(indexGatewayHome, true); - } - } catch (Exception e) { - // ignore - } - } - - public File indexGatewayHome() { - return this.indexGatewayHome; - } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java index 14ab0409c2a..8148c115d5b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java @@ -19,419 +19,28 @@ package org.elasticsearch.index.gateway.fs; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.store.IndexInput; -import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.DataInputStreamInput; -import org.elasticsearch.common.io.stream.DataOutputStreamOutput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.gateway.IndexGateway; -import org.elasticsearch.index.gateway.IndexShardGateway; -import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; -import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException; +import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexShardGateway; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; -import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.common.io.FileSystemUtils.*; -import static org.elasticsearch.common.lucene.Directories.*; -import static org.elasticsearch.index.translog.TranslogStreams.*; - /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ -public class FsIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { - - private final InternalIndexShard indexShard; - - private final ThreadPool threadPool; - - private final RecoveryThrottler recoveryThrottler; - - private final Store store; - - - private final boolean nativeCopy; - - private final File location; - - private final File locationIndex; - - private final File locationTranslog; +public class FsIndexShardGateway extends BlobStoreIndexShardGateway { @Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway fsIndexGateway, IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { - super(shardId, indexSettings); - this.threadPool = threadPool; - this.indexShard = (InternalIndexShard) indexShard; - this.store = store; - this.recoveryThrottler = recoveryThrottler; - - this.nativeCopy = componentSettings.getAsBoolean("native_copy", true); - - this.location = new File(((FsIndexGateway) fsIndexGateway).indexGatewayHome(), Integer.toString(shardId.id())); - this.locationIndex = new File(location, "index"); - this.locationTranslog = new File(location, "translog"); - - locationIndex.mkdirs(); - locationTranslog.mkdirs(); + super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store, recoveryThrottler); } - @Override public boolean requiresSnapshotScheduling() { - return true; - } - - @Override public String toString() { - return "fs[" + location + "]"; - } - - @Override public void close(boolean delete) { - if (delete) { - deleteRecursively(location, true); - } - } - - @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { - RecoveryStatus.Index recoveryStatusIndex = recoverIndex(); - RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog(); - return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog); - } - - @Override public SnapshotStatus snapshot(Snapshot snapshot) { - long totalTimeStart = System.currentTimeMillis(); - boolean indexDirty = false; - boolean translogDirty = false; - - final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit(); - final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot(); - - int indexNumberOfFiles = 0; - long indexTotalFilesSize = 0; - long indexTime = 0; - if (snapshot.indexChanged()) { - long time = System.currentTimeMillis(); - indexDirty = true; - // snapshot into the index - final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length); - final AtomicReference lastException = new AtomicReference(); - for (final String fileName : snapshotIndexCommit.getFiles()) { - // don't copy over the segments file, it will be copied over later on as part of the - // final snapshot phase - if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) { - latch.countDown(); - continue; - } - IndexInput indexInput = null; - try { - indexInput = snapshotIndexCommit.getDirectory().openInput(fileName); - File snapshotFile = new File(locationIndex, fileName); - if (snapshotFile.exists() && (snapshotFile.length() == indexInput.length())) { - // we assume its the same one, no need to copy - latch.countDown(); - continue; - } - } catch (Exception e) { - logger.debug("Failed to verify file equality based on length, copying...", e); - } finally { - if (indexInput != null) { - try { - indexInput.close(); - } catch (IOException e) { - // ignore - } - } - } - indexNumberOfFiles++; - try { - indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName); - } catch (IOException e) { - // ignore... - } - threadPool.execute(new Runnable() { - @Override public void run() { - File copyTo = new File(locationIndex, fileName); - try { - copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, copyTo, nativeCopy); - } catch (Exception e) { - lastException.set(new IndexShardGatewaySnapshotFailedException(shardId, "Failed to copy to [" + copyTo + "], from dir [" + snapshotIndexCommit.getDirectory() + "] and file [" + fileName + "]", e)); - } finally { - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - lastException.set(e); - } - if (lastException.get() != null) { - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get()); - } - indexTime = System.currentTimeMillis() - time; - } - // we reopen the RAF each snapshot and not keep an open one since we want to make sure we - // can sync it to disk later on (close it as well) - File translogFile = new File(locationTranslog, "translog-" + translogSnapshot.translogId()); - RandomAccessFile translogRaf = null; - - // if we have a different trnaslogId we want to flush the full translog to a new file (based on the translogId). - // If we still work on existing translog, just append the latest translog operations - int translogNumberOfOperations = 0; - long translogTime = 0; - if (snapshot.newTranslogCreated()) { - translogDirty = true; - try { - long time = System.currentTimeMillis(); - translogRaf = new RandomAccessFile(translogFile, "rw"); - StreamOutput out = new DataOutputStreamOutput(translogRaf); - out.writeInt(-1); // write the number of operations header with -1 currently - for (Translog.Operation operation : translogSnapshot) { - translogNumberOfOperations++; - writeTranslogOperation(out, operation); - } - translogTime = System.currentTimeMillis() - time; - } catch (Exception e) { - try { - if (translogRaf != null) { - translogRaf.close(); - } - } catch (IOException e1) { - // ignore - } - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogFile + "]", e); - } - } else if (snapshot.sameTranslogNewOperations()) { - translogDirty = true; - try { - long time = System.currentTimeMillis(); - translogRaf = new RandomAccessFile(translogFile, "rw"); - // seek to the end, since we append - translogRaf.seek(translogRaf.length()); - StreamOutput out = new DataOutputStreamOutput(translogRaf); - for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) { - translogNumberOfOperations++; - writeTranslogOperation(out, operation); - } - translogTime = System.currentTimeMillis() - time; - } catch (Exception e) { - try { - if (translogRaf != null) { - translogRaf.close(); - } - } catch (Exception e1) { - // ignore - } - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to append snapshot translog into [" + translogFile + "]", e); - } - } - - // now write the segments file and update the translog header - try { - if (indexDirty) { - indexNumberOfFiles++; - indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName()); - long time = System.currentTimeMillis(); - copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(), - new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()), nativeCopy); - indexTime += (System.currentTimeMillis() - time); - } - } catch (Exception e) { - try { - if (translogRaf != null) { - translogRaf.close(); - } - } catch (Exception e1) { - // ignore - } - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()) + "]", e); - } - - try { - if (translogDirty) { - translogRaf.seek(0); - translogRaf.writeInt(translogSnapshot.size()); - translogRaf.close(); - - // now, sync the translog - syncFile(translogFile); - } - } catch (Exception e) { - if (translogRaf != null) { - try { - translogRaf.close(); - } catch (Exception e1) { - // ignore - } - } - throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize snapshot into [" + translogFile + "]", e); - } - - // delete the old translog - if (snapshot.newTranslogCreated()) { - new File(locationTranslog, "translog-" + snapshot.lastTranslogId()).delete(); - } - - // delete files that no longer exists in the index - if (indexDirty) { - File[] existingFiles = locationIndex.listFiles(); - for (File existingFile : existingFiles) { - boolean found = false; - for (final String fileName : snapshotIndexCommit.getFiles()) { - if (existingFile.getName().equals(fileName)) { - found = true; - break; - } - } - if (!found) { - existingFile.delete(); - } - } - } - - return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart), - new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)), - new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime))); - } - - private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { - File[] files = locationIndex.listFiles(); - final CountDownLatch latch = new CountDownLatch(files.length); - final AtomicReference lastException = new AtomicReference(); - final AtomicLong throttlingWaitTime = new AtomicLong(); - for (final File file : files) { - threadPool.execute(new Runnable() { - @Override public void run() { - try { - long throttlingStartTime = System.currentTimeMillis(); - while (!recoveryThrottler.tryStream(shardId, file.getName())) { - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - } - throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime); - copyToDirectory(file, store.directory(), file.getName(), nativeCopy); - } catch (Exception e) { - logger.debug("Failed to read [" + file + "] into [" + store + "]", e); - lastException.set(e); - } finally { - recoveryThrottler.streamDone(shardId, file.getName()); - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - lastException.set(e); - } - if (lastException.get() != null) { - throw new IndexShardGatewayRecoveryException(shardId(), "Failed to recover index files", lastException.get()); - } - long totalSize = 0; - for (File file : files) { - totalSize += file.length(); - } - - long version = -1; - try { - if (IndexReader.indexExists(store.directory())) { - version = IndexReader.getCurrentVersion(store.directory()); - } - } catch (IOException e) { - throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); - } - - return new RecoveryStatus.Index(version, files.length, new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get())); - } - - private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException { - RandomAccessFile raf = null; - try { - long recoveryTranslogId = findLatestTranslogId(locationTranslog); - if (recoveryTranslogId == -1) { - // no recovery file found, start the shard and bail - indexShard.start(); - return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)); - } - File recoveryTranslogFile = new File(locationTranslog, "translog-" + recoveryTranslogId); - raf = new RandomAccessFile(recoveryTranslogFile, "r"); - int numberOfOperations = raf.readInt(); - ArrayList operations = Lists.newArrayListWithCapacity(numberOfOperations); - for (int i = 0; i < numberOfOperations; i++) { - operations.add(readTranslogOperation(new DataInputStreamInput(raf))); - } - indexShard.performRecovery(operations); - return new RecoveryStatus.Translog(recoveryTranslogId, operations.size(), new ByteSizeValue(recoveryTranslogFile.length(), ByteSizeUnit.BYTES)); - } catch (Exception e) { - throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e); - } finally { - if (raf != null) { - try { - raf.close(); - } catch (IOException e) { - // ignore - } - } - } - } - - private long findLatestTranslogId(File location) { - File[] files = location.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("translog-"); - } - }); - if (files == null) { - return -1; - } - - long index = -1; - for (File file : files) { - String name = file.getName(); - RandomAccessFile raf = null; - try { - raf = new RandomAccessFile(file, "r"); - // if header is -1, then its not properly written, ignore it - if (raf.readInt() == -1) { - continue; - } - } catch (Exception e) { - // broken file, continue - continue; - } finally { - try { - raf.close(); - } catch (IOException e) { - // ignore - } - } - long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1)); - if (fileIndex >= index) { - index = fileIndex; - } - } - - return index; + @Override public String type() { + return "fs"; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java index 64fa837d4f9..90c454d5a40 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java @@ -28,7 +28,7 @@ import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.settings.IndexSettings; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class NoneIndexGateway extends AbstractIndexComponent implements IndexGateway { @@ -36,6 +36,10 @@ public class NoneIndexGateway extends AbstractIndexComponent implements IndexGat super(index, indexSettings); } + @Override public String type() { + return "none"; + } + @Override public Class shardGatewayClass() { return NoneIndexShardGateway.class; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGatewayModule.java index 7a3c240a15c..4c5b7ba229b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGatewayModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGatewayModule.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.index.gateway.IndexGateway; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class NoneIndexGatewayModule extends AbstractModule { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index 4050c539dfc..56ab87e56d0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.gateway.none.NoneGateway; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.settings.IndexSettings; @@ -50,6 +51,10 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(0)), new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES))); } + @Override public String type() { + return NoneGateway.TYPE; + } + @Override public SnapshotStatus snapshot(Snapshot snapshot) { return SnapshotStatus.NA; } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/StreamsTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/StreamsTests.java index 7a45131217f..3de7beb8a5c 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/StreamsTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/StreamsTests.java @@ -39,9 +39,9 @@ public class StreamsTests { byte[] content = "content".getBytes(); ByteArrayInputStream in = new ByteArrayInputStream(content); ByteArrayOutputStream out = new ByteArrayOutputStream(content.length); - int count = copy(in, out); + long count = copy(in, out); - assertThat(count, equalTo(content.length)); + assertThat(count, equalTo((long) content.length)); assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true)); } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java index 43657a66069..fa79ae4045e 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java @@ -52,10 +52,11 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests buildNode("server1"); // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well ((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset(); + closeAllNodes(); } @Test public void testSnapshotOperations() throws Exception { - node("server1").start(); + startNode("server1"); // Translog tests diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/ChunkFsIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/ChunkFsIndexGatewayTests.java new file mode 100644 index 00000000000..a561d0e3db2 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/ChunkFsIndexGatewayTests.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.gateway.fs; + +import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTests; + +/** + * @author kimchy (shay.banon) + */ +public class ChunkFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests { + +} \ No newline at end of file diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/ChunkFsIndexGatewayTests.yml b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/ChunkFsIndexGatewayTests.yml new file mode 100644 index 00000000000..14b26d10d75 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/ChunkFsIndexGatewayTests.yml @@ -0,0 +1,10 @@ +cluster: + routing: + schedule: 100ms +gateway: + type: fs + fs: + chunk_size: 128b +index: + number_of_shards: 1 + number_of_replicas: 1 diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java index b6de72b246c..2507b34c363 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java @@ -30,7 +30,7 @@ import org.testng.annotations.Test; import static org.elasticsearch.client.Requests.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class FsMetaDataGatewayTests extends AbstractNodesTests { @@ -41,16 +41,15 @@ public class FsMetaDataGatewayTests extends AbstractNodesTests { closeAllNodes(); } - @BeforeMethod void buildNode1() throws Exception { + @BeforeMethod void buildNodeToReset() throws Exception { buildNode("server1"); // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well ((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset(); + closeAllNodes(); } @Test public void testIndexActions() throws Exception { - buildNode("server1"); - ((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset(); - node("server1").start(); + startNode("server1"); client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); diff --git a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java index 410f79d60a8..604d0fc2523 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java @@ -103,6 +103,10 @@ public class CloudGateway extends AbstractLifecycleComponent implements logger.debug("Latest metadata found at index [" + currentIndex + "]"); } + @Override public String type() { + return "cloud"; + } + public String container() { return this.container; } diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java index d76dd1ec753..d97fafa6c30 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java @@ -107,6 +107,10 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa logger.debug("Using location [{}], container [{}], index_directory [{}], chunk_size [{}]", this.location, this.indexContainer, this.indexDirectory, this.chunkSize); } + @Override public String type() { + return "cloud"; + } + public Location indexLocation() { return this.location; } diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java index adc0b4048dd..f3ab138f00f 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java @@ -120,6 +120,10 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen logger.trace("Using location [{}], container [{}], shard_directory [{}]", this.shardLocation, this.container, this.shardDirectory); } + @Override public String type() { + return "cloud"; + } + @Override public boolean requiresSnapshotScheduling() { return true; } diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java index 1125f50c3fe..8fded8214cd 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java @@ -90,6 +90,10 @@ public class HdfsGateway extends AbstractLifecycleComponent implements logger.debug("Latest metadata found at index [" + currentIndex + "]"); } + @Override public String type() { + return "fs"; + } + public FileSystem fileSystem() { return this.fileSystem; } diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java index 3d647e7f47e..be07d090bef 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexGateway.java @@ -64,6 +64,10 @@ public class HdfsIndexGateway extends AbstractIndexComponent implements IndexGat this.indexPath = new Path(new Path(path, "indices"), index.name()); } + @Override public String type() { + return "hdfs"; + } + public FileSystem fileSystem() { return this.fileSystem; } diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java index f9bb31b0886..063cd8ad121 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java @@ -114,6 +114,10 @@ public class HdfsIndexShardGateway extends AbstractIndexShardComponent implement } } + @Override public String type() { + return "hdfs"; + } + @Override public boolean requiresSnapshotScheduling() { return true; }