Gateway: Internal refactoring, requires manual upgrade when using fs gateway, closes #232.
This commit is contained in:
parent
733abdcac3
commit
7ed7c6db4e
|
@ -2,6 +2,7 @@
|
|||
<dictionary name="kimchy">
|
||||
<words>
|
||||
<w>addr</w>
|
||||
<w>appendable</w>
|
||||
<w>args</w>
|
||||
<w>asciifolding</w>
|
||||
<w>attr</w>
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface AppendableBlobContainer extends BlobContainer {
|
||||
|
||||
interface AppendBlobListener {
|
||||
void withStream(StreamOutput os) throws IOException;
|
||||
|
||||
void onCompleted();
|
||||
|
||||
void onFailure(Throwable t);
|
||||
}
|
||||
|
||||
interface AppendableBlob {
|
||||
|
||||
void append(AppendBlobListener listener);
|
||||
|
||||
void close();
|
||||
}
|
||||
|
||||
AppendableBlob appendBlob(String blobName, boolean append) throws IOException;
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface BlobContainer {
|
||||
|
||||
interface BlobNameFilter {
|
||||
/**
|
||||
* Return <tt>false</tt> if the blob should be filtered.
|
||||
*/
|
||||
boolean accept(String blobName);
|
||||
}
|
||||
|
||||
interface ReadBlobListener {
|
||||
|
||||
void onPartial(byte[] data, int offset, int size) throws IOException;
|
||||
|
||||
void onCompleted();
|
||||
|
||||
void onFailure(Throwable t);
|
||||
}
|
||||
|
||||
BlobPath path();
|
||||
|
||||
void readBlob(String blobName, ReadBlobListener listener);
|
||||
|
||||
byte[] readBlobFully(String blobName) throws IOException;
|
||||
|
||||
boolean deleteBlob(String blobName) throws IOException;
|
||||
|
||||
void deleteBlobsByPrefix(String blobNamePrefix) throws IOException;
|
||||
|
||||
void deleteBlobsByFilter(BlobNameFilter filter) throws IOException;
|
||||
|
||||
ImmutableMap<String, BlobMetaData> listBlobs() throws IOException;
|
||||
|
||||
ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface BlobMetaData {
|
||||
|
||||
String name();
|
||||
|
||||
long sizeInBytes();
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class BlobPath implements Iterable<String> {
|
||||
|
||||
private final ImmutableList<String> paths;
|
||||
|
||||
public BlobPath() {
|
||||
this.paths = ImmutableList.of();
|
||||
}
|
||||
|
||||
public static BlobPath cleanPath() {
|
||||
return new BlobPath();
|
||||
}
|
||||
|
||||
private BlobPath(ImmutableList<String> paths) {
|
||||
this.paths = paths;
|
||||
}
|
||||
|
||||
@Override public Iterator<String> iterator() {
|
||||
return paths.iterator();
|
||||
}
|
||||
|
||||
public String[] toArray() {
|
||||
return paths.toArray(new String[paths.size()]);
|
||||
}
|
||||
|
||||
public BlobPath add(String path) {
|
||||
ImmutableList.Builder<String> builder = ImmutableList.builder();
|
||||
return new BlobPath(builder.addAll(paths).add(path).build());
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (String path : paths) {
|
||||
sb.append('[').append(path).append(']');
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface BlobStore {
|
||||
|
||||
ImmutableBlobContainer immutableBlobContainer(BlobPath path);
|
||||
|
||||
AppendableBlobContainer appendableBlobContainer(BlobPath path);
|
||||
|
||||
void delete(BlobPath path);
|
||||
|
||||
void close();
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class BlobStoreException extends ElasticSearchException {
|
||||
|
||||
public BlobStoreException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
public BlobStoreException(String msg, Throwable cause) {
|
||||
super(msg, cause);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface ImmutableBlobContainer extends BlobContainer {
|
||||
|
||||
interface WriterListener {
|
||||
void onCompleted();
|
||||
|
||||
void onFailure(Throwable t);
|
||||
}
|
||||
|
||||
void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener);
|
||||
|
||||
void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException;
|
||||
}
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
|
||||
|
||||
protected final FsBlobStore blobStore;
|
||||
|
||||
protected final File path;
|
||||
|
||||
public AbstractFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
|
||||
super(blobPath);
|
||||
this.blobStore = blobStore;
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
|
||||
File[] files = path.listFiles();
|
||||
if (files == null || files.length == 0) {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
|
||||
for (File file : files) {
|
||||
builder.put(file.getName(), new FsBlobMetaData(file));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public boolean deleteBlob(String blobName) throws IOException {
|
||||
return new File(path, blobName).delete();
|
||||
}
|
||||
|
||||
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
|
||||
blobStore.executorService().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
byte[] buffer = new byte[1024 * 16];
|
||||
FileInputStream is;
|
||||
try {
|
||||
is = new FileInputStream(new File(path, blobName));
|
||||
} catch (FileNotFoundException e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
int bytesRead;
|
||||
while ((bytesRead = is.read(buffer)) != -1) {
|
||||
listener.onPartial(buffer, 0, bytesRead);
|
||||
}
|
||||
listener.onCompleted();
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
is.close();
|
||||
} catch (IOException e1) {
|
||||
// ignore
|
||||
}
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsAppendableBlobContainer extends AbstractFsBlobContainer implements AppendableBlobContainer {
|
||||
|
||||
public FsAppendableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
|
||||
super(blobStore, blobPath, path);
|
||||
}
|
||||
|
||||
@Override public AppendableBlob appendBlob(String blobName, boolean append) throws IOException {
|
||||
return new FsAppendableBlob(new File(path, blobName), append);
|
||||
}
|
||||
|
||||
private class FsAppendableBlob implements AppendableBlob {
|
||||
|
||||
private final File file;
|
||||
|
||||
public FsAppendableBlob(File file, boolean append) throws IOException {
|
||||
this.file = file;
|
||||
if (!append) {
|
||||
RandomAccessFile raf = new RandomAccessFile(file, "rw");
|
||||
raf.setLength(0);
|
||||
raf.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void append(final AppendBlobListener listener) {
|
||||
blobStore.executorService().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
raf = new RandomAccessFile(file, "rw");
|
||||
raf.seek(raf.length());
|
||||
listener.withStream(new DataOutputStreamOutput(raf));
|
||||
listener.onCompleted();
|
||||
raf.close();
|
||||
FileSystemUtils.syncFile(file);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
} finally {
|
||||
if (raf != null) {
|
||||
try {
|
||||
raf.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
// nothing to do there
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsBlobMetaData implements BlobMetaData {
|
||||
|
||||
private final File file;
|
||||
|
||||
public FsBlobMetaData(File file) {
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return file.getName();
|
||||
}
|
||||
|
||||
@Override public long sizeInBytes() {
|
||||
return file.length();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.common.blobstore.*;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsBlobStore implements BlobStore {
|
||||
|
||||
private final File path;
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public FsBlobStore(Settings settings, File path) {
|
||||
this.path = path;
|
||||
if (!path.exists()) {
|
||||
boolean b = path.mkdirs();
|
||||
if (!b) {
|
||||
throw new BlobStoreException("Failed to create directory at [" + path + "]");
|
||||
}
|
||||
}
|
||||
if (!path.isDirectory()) {
|
||||
throw new BlobStoreException("Path is not a directory at [" + path + "]");
|
||||
}
|
||||
executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "fs_blobstore"));
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return path.toString();
|
||||
}
|
||||
|
||||
public File path() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public ExecutorService executorService() {
|
||||
return executorService;
|
||||
}
|
||||
|
||||
@Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
|
||||
return new FsImmutableBlobContainer(this, path, buildAndCreate(path));
|
||||
}
|
||||
|
||||
@Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
|
||||
return new FsAppendableBlobContainer(this, path, buildAndCreate(path));
|
||||
}
|
||||
|
||||
@Override public void delete(BlobPath path) {
|
||||
FileSystemUtils.deleteRecursively(buildPath(path));
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
executorService.shutdown();
|
||||
try {
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
|
||||
private synchronized File buildAndCreate(BlobPath path) {
|
||||
File f = buildPath(path);
|
||||
f.mkdirs();
|
||||
return f;
|
||||
}
|
||||
|
||||
private File buildPath(BlobPath path) {
|
||||
String[] paths = path.toArray();
|
||||
if (paths.length == 0) {
|
||||
return path();
|
||||
}
|
||||
File blobPath = new File(this.path, paths[0]);
|
||||
if (paths.length > 1) {
|
||||
for (int i = 1; i < paths.length; i++) {
|
||||
blobPath = new File(blobPath, paths[i]);
|
||||
}
|
||||
}
|
||||
return blobPath;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
||||
import org.elasticsearch.common.blobstore.support.BlobStores;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements ImmutableBlobContainer {
|
||||
|
||||
public FsImmutableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
|
||||
super(blobStore, blobPath, path);
|
||||
}
|
||||
|
||||
@Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
|
||||
blobStore.executorService().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
File file = new File(path, blobName);
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
raf = new RandomAccessFile(file, "rw");
|
||||
} catch (FileNotFoundException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
try {
|
||||
try {
|
||||
byte[] buffer = new byte[16 * 1024];
|
||||
int bytesRead;
|
||||
while ((bytesRead = is.read(buffer)) != -1) {
|
||||
raf.write(buffer, 0, bytesRead);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
is.close();
|
||||
} catch (IOException ex) {
|
||||
// do nothing
|
||||
}
|
||||
try {
|
||||
raf.close();
|
||||
} catch (IOException ex) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
FileSystemUtils.syncFile(file);
|
||||
listener.onCompleted();
|
||||
} catch (Exception e) {
|
||||
// just on the safe size, try and delete it on failure
|
||||
try {
|
||||
if (file.exists()) {
|
||||
file.delete();
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
// ignore
|
||||
}
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
|
||||
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.support;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class AbstractBlobContainer implements BlobContainer {
|
||||
|
||||
private final BlobPath path;
|
||||
|
||||
protected AbstractBlobContainer(BlobPath path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
@Override public BlobPath path() {
|
||||
return this.path;
|
||||
}
|
||||
|
||||
@Override public byte[] readBlobFully(String blobName) throws IOException {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
|
||||
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
|
||||
readBlob(blobName, new ReadBlobListener() {
|
||||
@Override public void onPartial(byte[] data, int offset, int size) {
|
||||
bos.write(data, offset, size);
|
||||
}
|
||||
|
||||
@Override public void onCompleted() {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
failure.set(t);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted while waiting to read [" + blobName + "]");
|
||||
}
|
||||
|
||||
if (failure.get() != null) {
|
||||
if (failure.get() instanceof IOException) {
|
||||
throw (IOException) failure.get();
|
||||
} else {
|
||||
throw new IOException("Failed to get [" + blobName + "]", failure.get());
|
||||
}
|
||||
}
|
||||
return bos.toByteArray();
|
||||
}
|
||||
|
||||
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
|
||||
ImmutableMap<String, BlobMetaData> allBlobs = listBlobs();
|
||||
ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
|
||||
for (BlobMetaData blob : allBlobs.values()) {
|
||||
if (blob.name().startsWith(blobNamePrefix)) {
|
||||
blobs.put(blob.name(), blob);
|
||||
}
|
||||
}
|
||||
return blobs.build();
|
||||
}
|
||||
|
||||
@Override public void deleteBlobsByPrefix(final String blobNamePrefix) throws IOException {
|
||||
deleteBlobsByFilter(new BlobNameFilter() {
|
||||
@Override public boolean accept(String blobName) {
|
||||
return blobName.startsWith(blobNamePrefix);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public void deleteBlobsByFilter(BlobNameFilter filter) throws IOException {
|
||||
ImmutableMap<String, BlobMetaData> blobs = listBlobs();
|
||||
for (BlobMetaData blob : blobs.values()) {
|
||||
if (filter.accept(blob.name())) {
|
||||
deleteBlob(blob.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.support;
|
||||
|
||||
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class BlobStores {
|
||||
|
||||
public static void syncWriteBlob(ImmutableBlobContainer blobContainer, String blobName, InputStream is, long sizeInBytes) throws IOException {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
|
||||
blobContainer.writeBlob(blobName, is, sizeInBytes, new ImmutableBlobContainer.WriterListener() {
|
||||
@Override public void onCompleted() {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
failure.set(t);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted while waiting to write [" + blobName + "]");
|
||||
}
|
||||
|
||||
if (failure.get() != null) {
|
||||
if (failure.get() instanceof IOException) {
|
||||
throw (IOException) failure.get();
|
||||
} else {
|
||||
throw new IOException("Failed to get [" + blobName + "]", failure.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.support;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class PlainBlobMetaData implements BlobMetaData {
|
||||
|
||||
private final String name;
|
||||
|
||||
private final long sizeInBytes;
|
||||
|
||||
public PlainBlobMetaData(String name, long sizeInBytes) {
|
||||
this.name = name;
|
||||
this.sizeInBytes = sizeInBytes;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public long sizeInBytes() {
|
||||
return this.sizeInBytes;
|
||||
}
|
||||
}
|
|
@ -35,7 +35,7 @@ import java.io.*;
|
|||
*/
|
||||
public abstract class Streams {
|
||||
|
||||
public static final int BUFFER_SIZE = 4096;
|
||||
public static final int BUFFER_SIZE = 1024 * 8;
|
||||
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
|
@ -50,7 +50,7 @@ public abstract class Streams {
|
|||
* @return the number of bytes copied
|
||||
* @throws IOException in case of I/O errors
|
||||
*/
|
||||
public static int copy(File in, File out) throws IOException {
|
||||
public static long copy(File in, File out) throws IOException {
|
||||
Preconditions.checkNotNull(in, "No input File specified");
|
||||
Preconditions.checkNotNull(out, "No output File specified");
|
||||
return copy(new BufferedInputStream(new FileInputStream(in)),
|
||||
|
@ -98,11 +98,11 @@ public abstract class Streams {
|
|||
* @return the number of bytes copied
|
||||
* @throws IOException in case of I/O errors
|
||||
*/
|
||||
public static int copy(InputStream in, OutputStream out) throws IOException {
|
||||
public static long copy(InputStream in, OutputStream out) throws IOException {
|
||||
Preconditions.checkNotNull(in, "No InputStream specified");
|
||||
Preconditions.checkNotNull(out, "No OutputStream specified");
|
||||
try {
|
||||
int byteCount = 0;
|
||||
long byteCount = 0;
|
||||
byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int bytesRead;
|
||||
while ((bytesRead = in.read(buffer)) != -1) {
|
||||
|
|
|
@ -45,6 +45,10 @@ public class BytesStreamInput extends StreamInput {
|
|||
this.count = count;
|
||||
}
|
||||
|
||||
public int position() {
|
||||
return this.pos;
|
||||
}
|
||||
|
||||
@Override public int read() throws IOException {
|
||||
return (pos < count) ? (buf[pos++] & 0xff) : -1;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.elasticsearch.common.inject.Module;
|
|||
*/
|
||||
public interface Gateway extends LifecycleComponent<Gateway> {
|
||||
|
||||
String type();
|
||||
|
||||
void write(MetaData metaData) throws GatewayException;
|
||||
|
||||
MetaData read() throws GatewayException;
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.gateway.blobstore;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.blobstore.*;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.GatewayException;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class BlobStoreGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
|
||||
|
||||
private BlobStore blobStore;
|
||||
|
||||
private ByteSizeValue chunkSize;
|
||||
|
||||
private BlobPath basePath;
|
||||
|
||||
private ImmutableBlobContainer metaDataBlobContainer;
|
||||
|
||||
private volatile int currentIndex;
|
||||
|
||||
protected BlobStoreGateway(Settings settings) throws IOException {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
protected void initialize(BlobStore blobStore, ClusterName clusterName) throws IOException {
|
||||
this.blobStore = blobStore;
|
||||
this.chunkSize = componentSettings.getAsBytesSize("chunk_size", null);
|
||||
this.basePath = BlobPath.cleanPath().add(clusterName.value());
|
||||
this.metaDataBlobContainer = blobStore.immutableBlobContainer(basePath.add("metadata"));
|
||||
this.currentIndex = findLatestIndex();
|
||||
logger.debug("Latest metadata found at index [" + currentIndex + "]");
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return type() + "://" + blobStore + "/" + basePath;
|
||||
}
|
||||
|
||||
public BlobStore blobStore() {
|
||||
return blobStore;
|
||||
}
|
||||
|
||||
public BlobPath basePath() {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
public ByteSizeValue chunkSize() {
|
||||
return this.chunkSize;
|
||||
}
|
||||
|
||||
@Override public void reset() throws Exception {
|
||||
blobStore.delete(BlobPath.cleanPath());
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override protected void doStop() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override public MetaData read() throws GatewayException {
|
||||
try {
|
||||
this.currentIndex = findLatestIndex();
|
||||
} catch (IOException e) {
|
||||
throw new GatewayException("Failed to find latest metadata to read from", e);
|
||||
}
|
||||
if (currentIndex == -1)
|
||||
return null;
|
||||
String metaData = "metadata-" + currentIndex;
|
||||
|
||||
try {
|
||||
return readMetaData(metaDataBlobContainer.readBlobFully(metaData));
|
||||
} catch (GatewayException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new GatewayException("Failed to read metadata [" + metaData + "] from gateway", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void write(MetaData metaData) throws GatewayException {
|
||||
final String newMetaData = "metadata-" + (currentIndex + 1);
|
||||
BinaryXContentBuilder builder;
|
||||
try {
|
||||
builder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
|
||||
builder.prettyPrint();
|
||||
builder.startObject();
|
||||
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
} catch (IOException e) {
|
||||
throw new GatewayException("Failed to serialize metadata into gateway", e);
|
||||
}
|
||||
|
||||
try {
|
||||
metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()), builder.unsafeBytesLength());
|
||||
} catch (IOException e) {
|
||||
throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e);
|
||||
}
|
||||
|
||||
currentIndex++;
|
||||
|
||||
try {
|
||||
metaDataBlobContainer.deleteBlobsByFilter(new BlobContainer.BlobNameFilter() {
|
||||
@Override public boolean accept(String blobName) {
|
||||
return blobName.startsWith("metadata-") && !newMetaData.equals(blobName);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
logger.debug("Failed to delete old metadata, will do it next time", e);
|
||||
}
|
||||
}
|
||||
|
||||
private int findLatestIndex() throws IOException {
|
||||
ImmutableMap<String, BlobMetaData> blobs = metaDataBlobContainer.listBlobsByPrefix("metadata-");
|
||||
|
||||
int index = -1;
|
||||
for (BlobMetaData md : blobs.values()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[findLatestMetadata]: Processing [" + md.name() + "]");
|
||||
}
|
||||
String name = md.name();
|
||||
int fileIndex = Integer.parseInt(name.substring(name.indexOf('-') + 1));
|
||||
if (fileIndex >= index) {
|
||||
// try and read the meta data
|
||||
try {
|
||||
readMetaData(metaDataBlobContainer.readBlobFully(name));
|
||||
index = fileIndex;
|
||||
} catch (IOException e) {
|
||||
logger.warn("[findLatestMetadata]: Failed to read metadata from [" + name + "], ignoring...", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
private MetaData readMetaData(byte[] data) throws IOException {
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
|
||||
return MetaData.Builder.fromXContent(parser, settings);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,207 +19,42 @@
|
|||
|
||||
package org.elasticsearch.gateway.fs;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.GatewayException;
|
||||
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
|
||||
import org.elasticsearch.index.gateway.fs.FsIndexGatewayModule;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
import static org.elasticsearch.common.io.FileSystemUtils.*;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private final String location;
|
||||
|
||||
private final File gatewayHome;
|
||||
|
||||
private volatile int currentIndex;
|
||||
public class FsGateway extends BlobStoreGateway {
|
||||
|
||||
@Inject public FsGateway(Settings settings, Environment environment, ClusterName clusterName) throws IOException {
|
||||
super(settings);
|
||||
this.clusterName = clusterName;
|
||||
this.environment = environment;
|
||||
|
||||
this.location = componentSettings.get("location");
|
||||
|
||||
this.gatewayHome = createGatewayHome(location, environment, clusterName);
|
||||
|
||||
if (!gatewayHome.exists()) {
|
||||
throw new IOException("FsGateway location [" + gatewayHome + "] can't be created");
|
||||
}
|
||||
|
||||
this.currentIndex = findLatestIndex(gatewayHome);
|
||||
logger.debug("Latest metadata found at index [" + currentIndex + "]");
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override protected void doStop() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
public File gatewayHome() {
|
||||
return gatewayHome;
|
||||
}
|
||||
|
||||
private static File createGatewayHome(String location, Environment environment, ClusterName clusterName) {
|
||||
File f;
|
||||
if (location != null) {
|
||||
// if its a custom location, append the cluster name to it just so we have unique
|
||||
// in case two clusters point to the same location
|
||||
f = new File(new File(location), clusterName.value());
|
||||
File gatewayFile;
|
||||
String location = componentSettings.get("location");
|
||||
if (location == null) {
|
||||
logger.warn("using local fs location for gateway, should be changed to be a shared location across nodes");
|
||||
gatewayFile = new File(environment.workFile(), "gateway");
|
||||
} else {
|
||||
// work already includes the cluster name
|
||||
f = new File(environment.workWithClusterFile(), "gateway");
|
||||
gatewayFile = new File(location);
|
||||
}
|
||||
if (f.exists() && f.isDirectory()) {
|
||||
return f;
|
||||
}
|
||||
boolean result;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
result = f.mkdirs();
|
||||
if (result) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return f;
|
||||
initialize(new FsBlobStore(componentSettings, gatewayFile), clusterName);
|
||||
}
|
||||
|
||||
@Override public void write(MetaData metaData) throws GatewayException {
|
||||
try {
|
||||
final File file = new File(gatewayHome, "metadata-" + (currentIndex + 1));
|
||||
for (int i = 0; i < 5; i++) {
|
||||
if (file.createNewFile())
|
||||
break;
|
||||
}
|
||||
if (!file.exists()) {
|
||||
throw new GatewayException("Failed to create new file [" + file + "]");
|
||||
}
|
||||
|
||||
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
|
||||
builder.prettyPrint();
|
||||
builder.startObject();
|
||||
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
|
||||
FileOutputStream fileStream = new FileOutputStream(file);
|
||||
fileStream.write(builder.unsafeBytes(), 0, builder.unsafeBytesLength());
|
||||
fileStream.close();
|
||||
|
||||
syncFile(file);
|
||||
|
||||
currentIndex++;
|
||||
|
||||
//delete old files.
|
||||
File[] oldFiles = gatewayHome.listFiles(new FilenameFilter() {
|
||||
@Override public boolean accept(File dir, String name) {
|
||||
return name.startsWith("metadata-") && !name.equals(file.getName());
|
||||
}
|
||||
});
|
||||
|
||||
for (File oldFile : oldFiles) {
|
||||
oldFile.delete();
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new GatewayException("can't write new metadata file into the gateway", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public MetaData read() throws GatewayException {
|
||||
try {
|
||||
if (currentIndex == -1)
|
||||
return null;
|
||||
|
||||
File file = new File(gatewayHome, "metadata-" + currentIndex);
|
||||
if (!file.exists()) {
|
||||
throw new GatewayException("can't find current metadata file");
|
||||
}
|
||||
return readMetaData(file);
|
||||
} catch (GatewayException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new GatewayException("can't read metadata file from the gateway", e);
|
||||
}
|
||||
@Override public String type() {
|
||||
return "fs";
|
||||
}
|
||||
|
||||
@Override public Class<? extends Module> suggestIndexGateway() {
|
||||
return FsIndexGatewayModule.class;
|
||||
}
|
||||
|
||||
@Override public void reset() {
|
||||
FileSystemUtils.deleteRecursively(gatewayHome, false);
|
||||
currentIndex = -1;
|
||||
}
|
||||
|
||||
private int findLatestIndex(File gatewayHome) {
|
||||
File[] files = gatewayHome.listFiles(new FilenameFilter() {
|
||||
@Override public boolean accept(File dir, String name) {
|
||||
return name.startsWith("metadata-");
|
||||
}
|
||||
});
|
||||
|
||||
int index = -1;
|
||||
for (File file : files) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[findLatestMetadata]: Processing file [" + file + "]");
|
||||
}
|
||||
String name = file.getName();
|
||||
int fileIndex = Integer.parseInt(name.substring(name.indexOf('-') + 1));
|
||||
if (fileIndex >= index) {
|
||||
// try and read the meta data
|
||||
try {
|
||||
readMetaData(file);
|
||||
index = fileIndex;
|
||||
} catch (IOException e) {
|
||||
logger.warn("[findLatestMetadata]: Failed to read metadata from [" + file + "], ignoring...", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
private MetaData readMetaData(File file) throws IOException {
|
||||
FileInputStream fileStream = new FileInputStream(file);
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(fileStream);
|
||||
return MetaData.Builder.fromXContent(parser, settings);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
try {
|
||||
fileStream.close();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule;
|
|||
import org.elasticsearch.gateway.Gateway;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsGatewayModule extends AbstractModule {
|
||||
|
||||
|
|
|
@ -34,10 +34,16 @@ import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule;
|
|||
*/
|
||||
public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
|
||||
|
||||
public static final String TYPE = "none";
|
||||
|
||||
@Inject public NoneGateway(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.elasticsearch.index.IndexComponent;
|
|||
*/
|
||||
public interface IndexGateway extends IndexComponent, CloseableIndexComponent {
|
||||
|
||||
String type();
|
||||
|
||||
Class<? extends IndexShardGateway> shardGatewayClass();
|
||||
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ import static org.elasticsearch.common.unit.TimeValue.*;
|
|||
*/
|
||||
public interface IndexShardGateway extends IndexShardComponent, CloseableIndexComponent {
|
||||
|
||||
String type();
|
||||
|
||||
/**
|
||||
* Recovers the state of the shard from the gateway.
|
||||
*/
|
||||
|
@ -42,7 +44,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
|
|||
/**
|
||||
* Snapshots the given shard into the gateway.
|
||||
*/
|
||||
SnapshotStatus snapshot(Snapshot snapshot);
|
||||
SnapshotStatus snapshot(Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException;
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> if this gateway requires scheduling management for snapshot
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.gateway.blobstore;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
|
||||
import org.elasticsearch.gateway.none.NoneGateway;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class BlobStoreIndexGateway extends AbstractIndexComponent implements IndexGateway {
|
||||
|
||||
private final BlobStoreGateway gateway;
|
||||
|
||||
private final BlobStore blobStore;
|
||||
|
||||
private final BlobPath indexPath;
|
||||
|
||||
protected ByteSizeValue chunkSize;
|
||||
|
||||
protected BlobStoreIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) {
|
||||
super(index, indexSettings);
|
||||
|
||||
if (gateway.type().equals(NoneGateway.TYPE)) {
|
||||
logger.warn("index gateway is configured, but no cluster level gateway configured, cluster level metadata will be lost on full shutdown");
|
||||
}
|
||||
|
||||
this.gateway = (BlobStoreGateway) gateway;
|
||||
this.blobStore = this.gateway.blobStore();
|
||||
|
||||
this.chunkSize = componentSettings.getAsBytesSize("chunk_size", this.gateway.chunkSize());
|
||||
|
||||
this.indexPath = this.gateway.basePath().add("indices").add(index.name());
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return type() + "://" + blobStore + "/" + indexPath;
|
||||
}
|
||||
|
||||
public BlobStore blobStore() {
|
||||
return blobStore;
|
||||
}
|
||||
|
||||
public BlobPath indexPath() {
|
||||
return this.indexPath;
|
||||
}
|
||||
|
||||
public ByteSizeValue chunkSize() {
|
||||
return this.chunkSize;
|
||||
}
|
||||
|
||||
@Override public void close(boolean delete) throws ElasticSearchException {
|
||||
if (delete) {
|
||||
blobStore.delete(indexPath);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,589 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.gateway.blobstore;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.blobstore.*;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.trove.TObjectLongHashMap;
|
||||
import org.elasticsearch.common.trove.TObjectLongIterator;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
import org.elasticsearch.index.gateway.IndexShardGateway;
|
||||
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
|
||||
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.index.translog.TranslogStreams.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
|
||||
|
||||
protected final ThreadPool threadPool;
|
||||
|
||||
protected final InternalIndexShard indexShard;
|
||||
|
||||
protected final Store store;
|
||||
|
||||
protected final RecoveryThrottler recoveryThrottler;
|
||||
|
||||
|
||||
protected final ByteSizeValue chunkSize;
|
||||
|
||||
protected final BlobStore blobStore;
|
||||
|
||||
protected final BlobPath shardPath;
|
||||
|
||||
protected final ImmutableBlobContainer indexContainer;
|
||||
|
||||
protected final AppendableBlobContainer translogContainer;
|
||||
|
||||
private volatile AppendableBlobContainer.AppendableBlob translogBlob;
|
||||
|
||||
protected BlobStoreIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway,
|
||||
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
|
||||
super(shardId, indexSettings);
|
||||
|
||||
this.threadPool = threadPool;
|
||||
this.indexShard = (InternalIndexShard) indexShard;
|
||||
this.store = store;
|
||||
this.recoveryThrottler = recoveryThrottler;
|
||||
|
||||
this.chunkSize = ((BlobStoreIndexGateway) indexGateway).chunkSize(); // can be null -> no chunking
|
||||
this.blobStore = ((BlobStoreIndexGateway) indexGateway).blobStore();
|
||||
this.shardPath = ((BlobStoreIndexGateway) indexGateway).indexPath().add(Integer.toString(shardId.id()));
|
||||
|
||||
this.indexContainer = blobStore.immutableBlobContainer(shardPath.add("index"));
|
||||
this.translogContainer = blobStore.appendableBlobContainer(shardPath.add("translog"));
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return type() + "://" + blobStore + "/" + shardPath;
|
||||
}
|
||||
|
||||
@Override public boolean requiresSnapshotScheduling() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override public void close(boolean delete) throws ElasticSearchException {
|
||||
if (translogBlob != null) {
|
||||
translogBlob.close();
|
||||
translogBlob = null;
|
||||
}
|
||||
if (delete) {
|
||||
blobStore.delete(shardPath);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public SnapshotStatus snapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException {
|
||||
long totalTimeStart = System.currentTimeMillis();
|
||||
boolean indexDirty = false;
|
||||
|
||||
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
|
||||
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
|
||||
|
||||
ImmutableMap<String, BlobMetaData> indicesBlobs = null;
|
||||
TObjectLongHashMap<String> combinedIndicesBlobs = null;
|
||||
|
||||
int indexNumberOfFiles = 0;
|
||||
long indexTotalFilesSize = 0;
|
||||
long indexTime = 0;
|
||||
if (snapshot.indexChanged()) {
|
||||
long time = System.currentTimeMillis();
|
||||
indexDirty = true;
|
||||
|
||||
try {
|
||||
indicesBlobs = indexContainer.listBlobs();
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to list indices files from gateway", e);
|
||||
}
|
||||
combinedIndicesBlobs = buildCombinedPartsBlobs(indicesBlobs);
|
||||
|
||||
// snapshot into the index
|
||||
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
|
||||
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
|
||||
|
||||
for (final String fileName : snapshotIndexCommit.getFiles()) {
|
||||
// don't copy over the segments file, it will be copied over later on as part of the
|
||||
// final snapshot phase
|
||||
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
|
||||
latch.countDown();
|
||||
continue;
|
||||
}
|
||||
// if the file exists in the gateway, and has the same length, don't copy it over
|
||||
long fileSize;
|
||||
try {
|
||||
fileSize = snapshotIndexCommit.getDirectory().fileLength(fileName);
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get length on local store", e);
|
||||
}
|
||||
if (combinedIndicesBlobs.contains(fileName) && combinedIndicesBlobs.get(fileName) == fileSize) {
|
||||
latch.countDown();
|
||||
continue;
|
||||
}
|
||||
|
||||
// we are snapshotting the file
|
||||
|
||||
indexNumberOfFiles++;
|
||||
indexTotalFilesSize += fileSize;
|
||||
|
||||
if (combinedIndicesBlobs.contains(fileName)) {
|
||||
try {
|
||||
indexContainer.deleteBlobsByPrefix(fileName);
|
||||
} catch (IOException e) {
|
||||
logger.debug("failed to delete [" + fileName + "] before snapshotting, ignoring...");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
snapshotFile(snapshotIndexCommit.getDirectory(), fileName, latch, failures);
|
||||
} catch (IOException e) {
|
||||
failures.add(e);
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
failures.add(e);
|
||||
}
|
||||
if (!failures.isEmpty()) {
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
|
||||
}
|
||||
indexTime = System.currentTimeMillis() - time;
|
||||
}
|
||||
|
||||
|
||||
// handle if snapshot has changed
|
||||
final AtomicInteger translogNumberOfOperations = new AtomicInteger();
|
||||
long translogTime = 0;
|
||||
|
||||
if (snapshot.newTranslogCreated() || snapshot.sameTranslogNewOperations()) {
|
||||
long time = System.currentTimeMillis();
|
||||
|
||||
if (snapshot.newTranslogCreated() && translogBlob != null) {
|
||||
translogBlob.close();
|
||||
translogBlob = null;
|
||||
}
|
||||
|
||||
if (translogBlob == null) {
|
||||
try {
|
||||
translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId(), !snapshot.newTranslogCreated());
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to create translog", e);
|
||||
}
|
||||
}
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
|
||||
translogBlob.append(new AppendableBlobContainer.AppendBlobListener() {
|
||||
@Override public void withStream(StreamOutput os) throws IOException {
|
||||
int deltaNumberOfOperations;
|
||||
Iterable<Translog.Operation> operationsIt;
|
||||
if (snapshot.newTranslogCreated()) {
|
||||
deltaNumberOfOperations = translogSnapshot.size();
|
||||
operationsIt = translogSnapshot;
|
||||
} else {
|
||||
deltaNumberOfOperations = translogSnapshot.size() - snapshot.lastTranslogSize();
|
||||
operationsIt = translogSnapshot.skipTo(snapshot.lastTranslogSize());
|
||||
}
|
||||
os.writeInt(deltaNumberOfOperations);
|
||||
for (Translog.Operation operation : operationsIt) {
|
||||
writeTranslogOperation(os, operation);
|
||||
}
|
||||
translogNumberOfOperations.set(deltaNumberOfOperations);
|
||||
}
|
||||
|
||||
@Override public void onCompleted() {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
failure.set(t);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
failure.set(e);
|
||||
}
|
||||
|
||||
if (failure.get() != null) {
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to snapshot translog", failure.get());
|
||||
}
|
||||
|
||||
translogTime = System.currentTimeMillis() - time;
|
||||
}
|
||||
|
||||
// now write the segments file
|
||||
if (indexDirty) {
|
||||
try {
|
||||
indexNumberOfFiles++;
|
||||
if (indicesBlobs.containsKey(snapshotIndexCommit.getSegmentsFileName())) {
|
||||
indexContainer.deleteBlob(snapshotIndexCommit.getSegmentsFileName());
|
||||
}
|
||||
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
|
||||
long time = System.currentTimeMillis();
|
||||
|
||||
IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName());
|
||||
try {
|
||||
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE);
|
||||
indexContainer.writeBlob(snapshotIndexCommit.getSegmentsFileName(), is, is.actualSizeToRead());
|
||||
} finally {
|
||||
try {
|
||||
indexInput.close();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
indexTime += (System.currentTimeMillis() - time);
|
||||
} catch (Exception e) {
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
// delete the old translog
|
||||
if (snapshot.newTranslogCreated()) {
|
||||
try {
|
||||
translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId());
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
// delete old index files
|
||||
if (indexDirty) {
|
||||
for (TObjectLongIterator<String> it = combinedIndicesBlobs.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
boolean found = false;
|
||||
for (final String fileName : snapshotIndexCommit.getFiles()) {
|
||||
if (it.key().equals(fileName)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
try {
|
||||
indexContainer.deleteBlobsByPrefix(it.key());
|
||||
} catch (IOException e) {
|
||||
logger.debug("failed to delete unused index files, will retry later...", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
|
||||
new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
|
||||
new SnapshotStatus.Translog(translogNumberOfOperations.get(), new TimeValue(translogTime)));
|
||||
}
|
||||
|
||||
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
|
||||
RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
|
||||
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
|
||||
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
|
||||
}
|
||||
|
||||
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
|
||||
final ImmutableMap<String, BlobMetaData> blobs;
|
||||
try {
|
||||
blobs = translogContainer.listBlobsByPrefix("translog-");
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e);
|
||||
}
|
||||
|
||||
List<Long> translogIds = Lists.newArrayList();
|
||||
for (BlobMetaData blob : blobs.values()) {
|
||||
long translogId = Long.parseLong(blob.name().substring(blob.name().indexOf('-') + 1));
|
||||
translogIds.add(translogId);
|
||||
}
|
||||
|
||||
if (translogIds.isEmpty()) {
|
||||
// no recovery file found, start the shard and bail
|
||||
indexShard.start();
|
||||
return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES));
|
||||
}
|
||||
|
||||
Collections.sort(translogIds, new Comparator<Long>() {
|
||||
@Override public int compare(Long o1, Long o2) {
|
||||
return (int) (o2 - o1);
|
||||
}
|
||||
});
|
||||
|
||||
// try and recover from the latest translog id down to the first
|
||||
Exception lastException = null;
|
||||
for (Long translogId : translogIds) {
|
||||
try {
|
||||
ArrayList<Translog.Operation> operations = Lists.newArrayList();
|
||||
byte[] translogData = translogContainer.readBlobFully("translog-" + translogId);
|
||||
BytesStreamInput si = new BytesStreamInput(translogData);
|
||||
while (true) {
|
||||
// we recover them in parts, each part container the number of operations, and then the list of them
|
||||
int numberOfOperations = si.readInt();
|
||||
for (int i = 0; i < numberOfOperations; i++) {
|
||||
operations.add(readTranslogOperation(si));
|
||||
}
|
||||
if (si.position() == translogData.length) {
|
||||
// we have reached the end of the stream, bail
|
||||
break;
|
||||
}
|
||||
}
|
||||
indexShard.performRecovery(operations);
|
||||
return new RecoveryStatus.Translog(indexShard.translog().currentId(), operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES));
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
logger.debug("Failed to read translog, will try the next one", e);
|
||||
}
|
||||
}
|
||||
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", lastException);
|
||||
}
|
||||
|
||||
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
|
||||
final ImmutableMap<String, BlobMetaData> blobs;
|
||||
try {
|
||||
blobs = indexContainer.listBlobs();
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e);
|
||||
}
|
||||
TObjectLongHashMap<String> combinedBlobs = buildCombinedPartsBlobs(blobs);
|
||||
|
||||
// filter out only the files that we need to recover, and reuse ones that exists in the store
|
||||
List<String> filesToRecover = new ArrayList<String>();
|
||||
for (TObjectLongIterator<String> it = combinedBlobs.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
// if the store has the file, and it has the same length, don't recover it
|
||||
try {
|
||||
if (store.directory().fileExists(it.key()) && store.directory().fileLength(it.key()) == it.value()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("not recovering [{}], exists in local store and has same size [{}]", it.key(), new ByteSizeValue(it.value()));
|
||||
}
|
||||
} else {
|
||||
filesToRecover.add(it.key());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
filesToRecover.add(it.key());
|
||||
logger.debug("failed to check local store for existence of [{}]", it.key());
|
||||
}
|
||||
}
|
||||
|
||||
long totalSize = 0;
|
||||
final AtomicLong throttlingWaitTime = new AtomicLong();
|
||||
final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
|
||||
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
|
||||
for (final String fileToRecover : filesToRecover) {
|
||||
totalSize += combinedBlobs.get(fileToRecover);
|
||||
if (recoveryThrottler.tryStream(shardId, fileToRecover)) {
|
||||
// we managed to get a recovery going
|
||||
recoverFile(fileToRecover, blobs, latch, failures);
|
||||
} else {
|
||||
// lets reschedule to do it next time
|
||||
threadPool.schedule(new Runnable() {
|
||||
@Override public void run() {
|
||||
throttlingWaitTime.addAndGet(recoveryThrottler.throttleInterval().millis());
|
||||
if (recoveryThrottler.tryStream(shardId, fileToRecover)) {
|
||||
// we managed to get a recovery going
|
||||
recoverFile(fileToRecover, blobs, latch, failures);
|
||||
} else {
|
||||
threadPool.schedule(this, recoveryThrottler.throttleInterval());
|
||||
}
|
||||
}
|
||||
}, recoveryThrottler.throttleInterval());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId, "Interrupted while recovering index", e);
|
||||
}
|
||||
|
||||
if (!failures.isEmpty()) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery index", failures.get(0));
|
||||
}
|
||||
|
||||
long version = -1;
|
||||
try {
|
||||
if (IndexReader.indexExists(store.directory())) {
|
||||
version = IndexReader.getCurrentVersion(store.directory());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
|
||||
}
|
||||
|
||||
return new RecoveryStatus.Index(version, filesToRecover.size(), new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
|
||||
}
|
||||
|
||||
private void recoverFile(final String fileToRecover, final ImmutableMap<String, BlobMetaData> blobs, final CountDownLatch latch, final List<Throwable> failures) {
|
||||
final IndexOutput indexOutput;
|
||||
try {
|
||||
indexOutput = store.directory().createOutput(fileToRecover);
|
||||
} catch (IOException e) {
|
||||
recoveryThrottler.streamDone(shardId, fileToRecover);
|
||||
failures.add(e);
|
||||
latch.countDown();
|
||||
return;
|
||||
}
|
||||
final AtomicInteger partIndex = new AtomicInteger();
|
||||
indexContainer.readBlob(fileToRecover, new BlobContainer.ReadBlobListener() {
|
||||
@Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
|
||||
indexOutput.writeBytes(data, offset, size);
|
||||
}
|
||||
|
||||
@Override public synchronized void onCompleted() {
|
||||
int part = partIndex.incrementAndGet();
|
||||
String partName = fileToRecover + ".part" + part;
|
||||
if (blobs.containsKey(partName)) {
|
||||
// continue with the new part
|
||||
indexContainer.readBlob(partName, this);
|
||||
} else {
|
||||
// we are done...
|
||||
try {
|
||||
indexOutput.close();
|
||||
} catch (IOException e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
recoveryThrottler.streamDone(shardId, fileToRecover);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
recoveryThrottler.streamDone(shardId, fileToRecover);
|
||||
failures.add(t);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void snapshotFile(Directory dir, String fileName, final CountDownLatch latch, final List<Throwable> failures) throws IOException {
|
||||
long chunkBytes = Long.MAX_VALUE;
|
||||
if (chunkSize != null) {
|
||||
chunkBytes = chunkSize.bytes();
|
||||
}
|
||||
|
||||
long totalLength = dir.fileLength(fileName);
|
||||
long numberOfChunks = totalLength / chunkBytes;
|
||||
if (totalLength % chunkBytes > 0) {
|
||||
numberOfChunks++;
|
||||
}
|
||||
if (numberOfChunks == 0) {
|
||||
numberOfChunks++;
|
||||
}
|
||||
|
||||
final AtomicLong counter = new AtomicLong(numberOfChunks);
|
||||
for (long i = 0; i < numberOfChunks; i++) {
|
||||
final long chunkNumber = i;
|
||||
|
||||
IndexInput indexInput = null;
|
||||
try {
|
||||
indexInput = dir.openInput(fileName);
|
||||
indexInput.seek(chunkNumber * chunkBytes);
|
||||
InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes);
|
||||
|
||||
String blobName = fileName;
|
||||
if (chunkNumber > 0) {
|
||||
blobName += ".part" + chunkNumber;
|
||||
}
|
||||
|
||||
final IndexInput fIndexInput = indexInput;
|
||||
indexContainer.writeBlob(blobName, is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() {
|
||||
@Override public void onCompleted() {
|
||||
try {
|
||||
fIndexInput.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
failures.add(t);
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
if (indexInput != null) {
|
||||
try {
|
||||
indexInput.close();
|
||||
} catch (IOException e1) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
failures.add(e);
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TObjectLongHashMap<String> buildCombinedPartsBlobs(ImmutableMap<String, BlobMetaData> blobs) {
|
||||
TObjectLongHashMap<String> combinedBlobs = new TObjectLongHashMap<String>();
|
||||
for (BlobMetaData blob : blobs.values()) {
|
||||
String cleanName;
|
||||
int partIndex = blob.name().indexOf(".part");
|
||||
if (partIndex == -1) {
|
||||
cleanName = blob.name();
|
||||
} else {
|
||||
cleanName = blob.name().substring(0, partIndex);
|
||||
}
|
||||
combinedBlobs.adjustOrPutValue(cleanName, blob.sizeInBytes(), blob.sizeInBytes());
|
||||
}
|
||||
return combinedBlobs;
|
||||
}
|
||||
}
|
|
@ -19,81 +19,29 @@
|
|||
|
||||
package org.elasticsearch.index.gateway.fs;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.fs.FsGateway;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexException;
|
||||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
import org.elasticsearch.index.gateway.IndexShardGateway;
|
||||
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import static org.elasticsearch.common.io.FileSystemUtils.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsIndexGateway extends AbstractIndexComponent implements IndexGateway {
|
||||
|
||||
private final String location;
|
||||
|
||||
private File indexGatewayHome;
|
||||
public class FsIndexGateway extends BlobStoreIndexGateway {
|
||||
|
||||
@Inject public FsIndexGateway(Index index, @IndexSettings Settings indexSettings, Environment environment, Gateway gateway) {
|
||||
super(index, indexSettings);
|
||||
super(index, indexSettings, gateway);
|
||||
}
|
||||
|
||||
String location = componentSettings.get("location");
|
||||
if (location == null) {
|
||||
if (gateway instanceof FsGateway) {
|
||||
indexGatewayHome = new File(new File(((FsGateway) gateway).gatewayHome(), "indices"), index.name());
|
||||
} else {
|
||||
indexGatewayHome = new File(new File(new File(environment.workWithClusterFile(), "gateway"), "indices"), index.name());
|
||||
}
|
||||
location = Strings.cleanPath(indexGatewayHome.getAbsolutePath());
|
||||
} else {
|
||||
indexGatewayHome = new File(new File(location), index.name());
|
||||
}
|
||||
this.location = location;
|
||||
|
||||
if (!(indexGatewayHome.exists() && indexGatewayHome.isDirectory())) {
|
||||
boolean result;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
result = indexGatewayHome.mkdirs();
|
||||
if (result) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!(indexGatewayHome.exists() && indexGatewayHome.isDirectory())) {
|
||||
throw new IndexException(index, "Failed to create index gateway at [" + indexGatewayHome + "]");
|
||||
}
|
||||
@Override public String type() {
|
||||
return "fs";
|
||||
}
|
||||
|
||||
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
|
||||
return FsIndexShardGateway.class;
|
||||
}
|
||||
|
||||
@Override public void close(boolean delete) {
|
||||
if (!delete) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
String[] files = indexGatewayHome.list();
|
||||
if (files == null || files.length == 0) {
|
||||
deleteRecursively(indexGatewayHome, true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
public File indexGatewayHome() {
|
||||
return this.indexGatewayHome;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,419 +19,28 @@
|
|||
|
||||
package org.elasticsearch.index.gateway.fs;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.DataInputStreamInput;
|
||||
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
import org.elasticsearch.index.gateway.IndexShardGateway;
|
||||
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
|
||||
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
|
||||
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexShardGateway;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.common.io.FileSystemUtils.*;
|
||||
import static org.elasticsearch.common.lucene.Directories.*;
|
||||
import static org.elasticsearch.index.translog.TranslogStreams.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
|
||||
|
||||
private final InternalIndexShard indexShard;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final RecoveryThrottler recoveryThrottler;
|
||||
|
||||
private final Store store;
|
||||
|
||||
|
||||
private final boolean nativeCopy;
|
||||
|
||||
private final File location;
|
||||
|
||||
private final File locationIndex;
|
||||
|
||||
private final File locationTranslog;
|
||||
public class FsIndexShardGateway extends BlobStoreIndexShardGateway {
|
||||
|
||||
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway fsIndexGateway,
|
||||
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
|
||||
super(shardId, indexSettings);
|
||||
this.threadPool = threadPool;
|
||||
this.indexShard = (InternalIndexShard) indexShard;
|
||||
this.store = store;
|
||||
this.recoveryThrottler = recoveryThrottler;
|
||||
|
||||
this.nativeCopy = componentSettings.getAsBoolean("native_copy", true);
|
||||
|
||||
this.location = new File(((FsIndexGateway) fsIndexGateway).indexGatewayHome(), Integer.toString(shardId.id()));
|
||||
this.locationIndex = new File(location, "index");
|
||||
this.locationTranslog = new File(location, "translog");
|
||||
|
||||
locationIndex.mkdirs();
|
||||
locationTranslog.mkdirs();
|
||||
super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store, recoveryThrottler);
|
||||
}
|
||||
|
||||
@Override public boolean requiresSnapshotScheduling() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "fs[" + location + "]";
|
||||
}
|
||||
|
||||
@Override public void close(boolean delete) {
|
||||
if (delete) {
|
||||
deleteRecursively(location, true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
|
||||
RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
|
||||
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
|
||||
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
|
||||
}
|
||||
|
||||
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
|
||||
long totalTimeStart = System.currentTimeMillis();
|
||||
boolean indexDirty = false;
|
||||
boolean translogDirty = false;
|
||||
|
||||
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
|
||||
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
|
||||
|
||||
int indexNumberOfFiles = 0;
|
||||
long indexTotalFilesSize = 0;
|
||||
long indexTime = 0;
|
||||
if (snapshot.indexChanged()) {
|
||||
long time = System.currentTimeMillis();
|
||||
indexDirty = true;
|
||||
// snapshot into the index
|
||||
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
|
||||
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
|
||||
for (final String fileName : snapshotIndexCommit.getFiles()) {
|
||||
// don't copy over the segments file, it will be copied over later on as part of the
|
||||
// final snapshot phase
|
||||
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
|
||||
latch.countDown();
|
||||
continue;
|
||||
}
|
||||
IndexInput indexInput = null;
|
||||
try {
|
||||
indexInput = snapshotIndexCommit.getDirectory().openInput(fileName);
|
||||
File snapshotFile = new File(locationIndex, fileName);
|
||||
if (snapshotFile.exists() && (snapshotFile.length() == indexInput.length())) {
|
||||
// we assume its the same one, no need to copy
|
||||
latch.countDown();
|
||||
continue;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to verify file equality based on length, copying...", e);
|
||||
} finally {
|
||||
if (indexInput != null) {
|
||||
try {
|
||||
indexInput.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
indexNumberOfFiles++;
|
||||
try {
|
||||
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName);
|
||||
} catch (IOException e) {
|
||||
// ignore...
|
||||
}
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
File copyTo = new File(locationIndex, fileName);
|
||||
try {
|
||||
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, copyTo, nativeCopy);
|
||||
} catch (Exception e) {
|
||||
lastException.set(new IndexShardGatewaySnapshotFailedException(shardId, "Failed to copy to [" + copyTo + "], from dir [" + snapshotIndexCommit.getDirectory() + "] and file [" + fileName + "]", e));
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
lastException.set(e);
|
||||
}
|
||||
if (lastException.get() != null) {
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
|
||||
}
|
||||
indexTime = System.currentTimeMillis() - time;
|
||||
}
|
||||
// we reopen the RAF each snapshot and not keep an open one since we want to make sure we
|
||||
// can sync it to disk later on (close it as well)
|
||||
File translogFile = new File(locationTranslog, "translog-" + translogSnapshot.translogId());
|
||||
RandomAccessFile translogRaf = null;
|
||||
|
||||
// if we have a different trnaslogId we want to flush the full translog to a new file (based on the translogId).
|
||||
// If we still work on existing translog, just append the latest translog operations
|
||||
int translogNumberOfOperations = 0;
|
||||
long translogTime = 0;
|
||||
if (snapshot.newTranslogCreated()) {
|
||||
translogDirty = true;
|
||||
try {
|
||||
long time = System.currentTimeMillis();
|
||||
translogRaf = new RandomAccessFile(translogFile, "rw");
|
||||
StreamOutput out = new DataOutputStreamOutput(translogRaf);
|
||||
out.writeInt(-1); // write the number of operations header with -1 currently
|
||||
for (Translog.Operation operation : translogSnapshot) {
|
||||
translogNumberOfOperations++;
|
||||
writeTranslogOperation(out, operation);
|
||||
}
|
||||
translogTime = System.currentTimeMillis() - time;
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
if (translogRaf != null) {
|
||||
translogRaf.close();
|
||||
}
|
||||
} catch (IOException e1) {
|
||||
// ignore
|
||||
}
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogFile + "]", e);
|
||||
}
|
||||
} else if (snapshot.sameTranslogNewOperations()) {
|
||||
translogDirty = true;
|
||||
try {
|
||||
long time = System.currentTimeMillis();
|
||||
translogRaf = new RandomAccessFile(translogFile, "rw");
|
||||
// seek to the end, since we append
|
||||
translogRaf.seek(translogRaf.length());
|
||||
StreamOutput out = new DataOutputStreamOutput(translogRaf);
|
||||
for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
|
||||
translogNumberOfOperations++;
|
||||
writeTranslogOperation(out, operation);
|
||||
}
|
||||
translogTime = System.currentTimeMillis() - time;
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
if (translogRaf != null) {
|
||||
translogRaf.close();
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
// ignore
|
||||
}
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to append snapshot translog into [" + translogFile + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
// now write the segments file and update the translog header
|
||||
try {
|
||||
if (indexDirty) {
|
||||
indexNumberOfFiles++;
|
||||
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
|
||||
long time = System.currentTimeMillis();
|
||||
copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(),
|
||||
new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()), nativeCopy);
|
||||
indexTime += (System.currentTimeMillis() - time);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
if (translogRaf != null) {
|
||||
translogRaf.close();
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
// ignore
|
||||
}
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()) + "]", e);
|
||||
}
|
||||
|
||||
try {
|
||||
if (translogDirty) {
|
||||
translogRaf.seek(0);
|
||||
translogRaf.writeInt(translogSnapshot.size());
|
||||
translogRaf.close();
|
||||
|
||||
// now, sync the translog
|
||||
syncFile(translogFile);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (translogRaf != null) {
|
||||
try {
|
||||
translogRaf.close();
|
||||
} catch (Exception e1) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize snapshot into [" + translogFile + "]", e);
|
||||
}
|
||||
|
||||
// delete the old translog
|
||||
if (snapshot.newTranslogCreated()) {
|
||||
new File(locationTranslog, "translog-" + snapshot.lastTranslogId()).delete();
|
||||
}
|
||||
|
||||
// delete files that no longer exists in the index
|
||||
if (indexDirty) {
|
||||
File[] existingFiles = locationIndex.listFiles();
|
||||
for (File existingFile : existingFiles) {
|
||||
boolean found = false;
|
||||
for (final String fileName : snapshotIndexCommit.getFiles()) {
|
||||
if (existingFile.getName().equals(fileName)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
existingFile.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
|
||||
new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
|
||||
new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime)));
|
||||
}
|
||||
|
||||
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
|
||||
File[] files = locationIndex.listFiles();
|
||||
final CountDownLatch latch = new CountDownLatch(files.length);
|
||||
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
|
||||
final AtomicLong throttlingWaitTime = new AtomicLong();
|
||||
for (final File file : files) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
long throttlingStartTime = System.currentTimeMillis();
|
||||
while (!recoveryThrottler.tryStream(shardId, file.getName())) {
|
||||
Thread.sleep(recoveryThrottler.throttleInterval().millis());
|
||||
}
|
||||
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
|
||||
copyToDirectory(file, store.directory(), file.getName(), nativeCopy);
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to read [" + file + "] into [" + store + "]", e);
|
||||
lastException.set(e);
|
||||
} finally {
|
||||
recoveryThrottler.streamDone(shardId, file.getName());
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
lastException.set(e);
|
||||
}
|
||||
if (lastException.get() != null) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to recover index files", lastException.get());
|
||||
}
|
||||
long totalSize = 0;
|
||||
for (File file : files) {
|
||||
totalSize += file.length();
|
||||
}
|
||||
|
||||
long version = -1;
|
||||
try {
|
||||
if (IndexReader.indexExists(store.directory())) {
|
||||
version = IndexReader.getCurrentVersion(store.directory());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
|
||||
}
|
||||
|
||||
return new RecoveryStatus.Index(version, files.length, new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
|
||||
}
|
||||
|
||||
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
long recoveryTranslogId = findLatestTranslogId(locationTranslog);
|
||||
if (recoveryTranslogId == -1) {
|
||||
// no recovery file found, start the shard and bail
|
||||
indexShard.start();
|
||||
return new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES));
|
||||
}
|
||||
File recoveryTranslogFile = new File(locationTranslog, "translog-" + recoveryTranslogId);
|
||||
raf = new RandomAccessFile(recoveryTranslogFile, "r");
|
||||
int numberOfOperations = raf.readInt();
|
||||
ArrayList<Translog.Operation> operations = Lists.newArrayListWithCapacity(numberOfOperations);
|
||||
for (int i = 0; i < numberOfOperations; i++) {
|
||||
operations.add(readTranslogOperation(new DataInputStreamInput(raf)));
|
||||
}
|
||||
indexShard.performRecovery(operations);
|
||||
return new RecoveryStatus.Translog(recoveryTranslogId, operations.size(), new ByteSizeValue(recoveryTranslogFile.length(), ByteSizeUnit.BYTES));
|
||||
} catch (Exception e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
|
||||
} finally {
|
||||
if (raf != null) {
|
||||
try {
|
||||
raf.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long findLatestTranslogId(File location) {
|
||||
File[] files = location.listFiles(new FilenameFilter() {
|
||||
@Override public boolean accept(File dir, String name) {
|
||||
return name.startsWith("translog-");
|
||||
}
|
||||
});
|
||||
if (files == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
long index = -1;
|
||||
for (File file : files) {
|
||||
String name = file.getName();
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
raf = new RandomAccessFile(file, "r");
|
||||
// if header is -1, then its not properly written, ignore it
|
||||
if (raf.readInt() == -1) {
|
||||
continue;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// broken file, continue
|
||||
continue;
|
||||
} finally {
|
||||
try {
|
||||
raf.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
|
||||
if (fileIndex >= index) {
|
||||
index = fileIndex;
|
||||
}
|
||||
}
|
||||
|
||||
return index;
|
||||
@Override public String type() {
|
||||
return "fs";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.elasticsearch.index.gateway.IndexShardGateway;
|
|||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class NoneIndexGateway extends AbstractIndexComponent implements IndexGateway {
|
||||
|
||||
|
@ -36,6 +36,10 @@ public class NoneIndexGateway extends AbstractIndexComponent implements IndexGat
|
|||
super(index, indexSettings);
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "none";
|
||||
}
|
||||
|
||||
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
|
||||
return NoneIndexShardGateway.class;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule;
|
|||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class NoneIndexGatewayModule extends AbstractModule {
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.gateway.none.NoneGateway;
|
||||
import org.elasticsearch.index.gateway.IndexShardGateway;
|
||||
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
@ -50,6 +51,10 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
|
|||
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(0)), new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)));
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return NoneGateway.TYPE;
|
||||
}
|
||||
|
||||
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
|
||||
return SnapshotStatus.NA;
|
||||
}
|
||||
|
|
|
@ -39,9 +39,9 @@ public class StreamsTests {
|
|||
byte[] content = "content".getBytes();
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(content);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream(content.length);
|
||||
int count = copy(in, out);
|
||||
long count = copy(in, out);
|
||||
|
||||
assertThat(count, equalTo(content.length));
|
||||
assertThat(count, equalTo((long) content.length));
|
||||
assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true));
|
||||
}
|
||||
|
||||
|
|
|
@ -52,10 +52,11 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
|
|||
buildNode("server1");
|
||||
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
|
||||
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();
|
||||
closeAllNodes();
|
||||
}
|
||||
|
||||
@Test public void testSnapshotOperations() throws Exception {
|
||||
node("server1").start();
|
||||
startNode("server1");
|
||||
|
||||
// Translog tests
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.integration.gateway.fs;
|
||||
|
||||
import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTests;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class ChunkFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
|
||||
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
cluster:
|
||||
routing:
|
||||
schedule: 100ms
|
||||
gateway:
|
||||
type: fs
|
||||
fs:
|
||||
chunk_size: 128b
|
||||
index:
|
||||
number_of_shards: 1
|
||||
number_of_replicas: 1
|
|
@ -30,7 +30,7 @@ import org.testng.annotations.Test;
|
|||
import static org.elasticsearch.client.Requests.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class FsMetaDataGatewayTests extends AbstractNodesTests {
|
||||
|
||||
|
@ -41,16 +41,15 @@ public class FsMetaDataGatewayTests extends AbstractNodesTests {
|
|||
closeAllNodes();
|
||||
}
|
||||
|
||||
@BeforeMethod void buildNode1() throws Exception {
|
||||
@BeforeMethod void buildNodeToReset() throws Exception {
|
||||
buildNode("server1");
|
||||
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
|
||||
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();
|
||||
closeAllNodes();
|
||||
}
|
||||
|
||||
@Test public void testIndexActions() throws Exception {
|
||||
buildNode("server1");
|
||||
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();
|
||||
node("server1").start();
|
||||
startNode("server1");
|
||||
|
||||
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
|
||||
|
|
|
@ -103,6 +103,10 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
|
|||
logger.debug("Latest metadata found at index [" + currentIndex + "]");
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "cloud";
|
||||
}
|
||||
|
||||
public String container() {
|
||||
return this.container;
|
||||
}
|
||||
|
|
|
@ -107,6 +107,10 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
|
|||
logger.debug("Using location [{}], container [{}], index_directory [{}], chunk_size [{}]", this.location, this.indexContainer, this.indexDirectory, this.chunkSize);
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "cloud";
|
||||
}
|
||||
|
||||
public Location indexLocation() {
|
||||
return this.location;
|
||||
}
|
||||
|
|
|
@ -120,6 +120,10 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
|
|||
logger.trace("Using location [{}], container [{}], shard_directory [{}]", this.shardLocation, this.container, this.shardDirectory);
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "cloud";
|
||||
}
|
||||
|
||||
@Override public boolean requiresSnapshotScheduling() {
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -90,6 +90,10 @@ public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements
|
|||
logger.debug("Latest metadata found at index [" + currentIndex + "]");
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "fs";
|
||||
}
|
||||
|
||||
public FileSystem fileSystem() {
|
||||
return this.fileSystem;
|
||||
}
|
||||
|
|
|
@ -64,6 +64,10 @@ public class HdfsIndexGateway extends AbstractIndexComponent implements IndexGat
|
|||
this.indexPath = new Path(new Path(path, "indices"), index.name());
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "hdfs";
|
||||
}
|
||||
|
||||
public FileSystem fileSystem() {
|
||||
return this.fileSystem;
|
||||
}
|
||||
|
|
|
@ -114,6 +114,10 @@ public class HdfsIndexShardGateway extends AbstractIndexShardComponent implement
|
|||
}
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "hdfs";
|
||||
}
|
||||
|
||||
@Override public boolean requiresSnapshotScheduling() {
|
||||
return true;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue