YARN-5967. Fix slider core module findbugs warnings. Contributed by Jian He
This commit is contained in:
parent
c11f4b3c21
commit
db96e8aa21
|
@ -0,0 +1,96 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<FindBugsFilter>
|
||||
<Match>
|
||||
<Package name="org.apache.slider.api.proto" />
|
||||
</Match>
|
||||
<Match>
|
||||
<class name="~org\.apache\.slider\.*" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.core.zk.BlockingZKWatcher" />
|
||||
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.appmaster.state.ProviderAppState" />
|
||||
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.providers.ProviderUtils" />
|
||||
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.appmaster.SliderAppMaster" />
|
||||
<Bug pattern="WA_AWAIT_NOT_IN_LOOP" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.core.zk.ZKIntegration" />
|
||||
<Bug pattern="SBSC_USE_STRINGBUFFER_CONCATENATION" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.servicemonitor.ProbeWorker" />
|
||||
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.core.persist.JsonSerDeser" />
|
||||
<Bug pattern="UI_INHERITANCE_UNSAFE_GETRESOURCE" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider" />
|
||||
<Bug pattern="EI_EXPOSE_REP" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.appmaster.state.OutstandingRequest" />
|
||||
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.appmaster.web.SliderAMWebApp" />
|
||||
<Bug pattern="LG_LOST_LOGGER_DUE_TO_WEAK_REFERENCE" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.servicemonitor.Probe" />
|
||||
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.services.workflow.ForkedProcessService" />
|
||||
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.server.appmaster.state.RoleInstance"/>
|
||||
<Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.client.SliderClient" />
|
||||
<Method name="actionRegistryListConfigsYarn" />
|
||||
<Bug pattern="OS_OPEN_STREAM" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.client.SliderClient" />
|
||||
<Method name="actionRegistryListExports" />
|
||||
<Bug pattern="OS_OPEN_STREAM" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.slider.common.tools.SliderUtils" />
|
||||
<Method name="getApplicationResourceInputStream" />
|
||||
<Bug pattern="OS_OPEN_STREAM" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
|
@ -1,20 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<FindBugsFilter>
|
||||
|
||||
</FindBugsFilter>
|
|
@ -26,6 +26,11 @@
|
|||
<packaging>jar</packaging>
|
||||
<name>Apache Hadoop YARN Slider Core</name>
|
||||
|
||||
<properties>
|
||||
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||
<yarn.basedir>${project.parent.basedir}</yarn.basedir>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<!-- resources are filtered for dynamic updates. This gets build info in-->
|
||||
<resources>
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.io.Closeable;
|
|||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.InetAddress;
|
||||
|
@ -82,7 +83,7 @@ public class KerberosDiags implements Closeable {
|
|||
|
||||
private final Configuration conf;
|
||||
private final List<String> services;
|
||||
private final PrintWriter out;
|
||||
private final PrintStream out;
|
||||
private final File keytab;
|
||||
private final String principal;
|
||||
private final long minKeyLength;
|
||||
|
@ -97,7 +98,7 @@ public class KerberosDiags implements Closeable {
|
|||
|
||||
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
|
||||
public KerberosDiags(Configuration conf,
|
||||
PrintWriter out,
|
||||
PrintStream out,
|
||||
List<String> services,
|
||||
File keytab,
|
||||
String principal,
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.util.Map;
|
|||
* Serializable version of component data.
|
||||
* <p>
|
||||
* This is sent in REST calls as a JSON object —but is also marshalled into
|
||||
* a protobuf structure. Look at {@link org.apache.slider.api.proto.RestTypeMarshalling}
|
||||
* a protobuf structure. Look at {@link RestTypeMarshalling}
|
||||
* for the specifics there.
|
||||
* <p>
|
||||
* This means that if any fields are added here. they must be added to
|
||||
|
|
|
@ -16,8 +16,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.slider.api.proto;
|
||||
package org.apache.slider.api.types;
|
||||
|
||||
import org.apache.slider.api.proto.Messages;
|
||||
import org.apache.slider.api.types.ApplicationLivenessInformation;
|
||||
import org.apache.slider.api.types.ComponentInformation;
|
||||
import org.apache.slider.api.types.ContainerInformation;
|
||||
|
@ -66,7 +67,7 @@ public class RestTypeMarshalling {
|
|||
info.name = wire.getName();
|
||||
info.priority = wire.getPriority();
|
||||
info.placementPolicy = wire.getPlacementPolicy();
|
||||
|
||||
|
||||
info.actual = wire.getActual();
|
||||
info.completed = wire.getCompleted();
|
||||
info.desired = wire.getDesired();
|
||||
|
@ -95,7 +96,7 @@ public class RestTypeMarshalling {
|
|||
builder.setName(info.name);
|
||||
builder.setPriority(info.priority);
|
||||
builder.setPlacementPolicy(info.placementPolicy);
|
||||
|
||||
|
||||
builder.setActual(info.actual);
|
||||
builder.setCompleted(info.completed);
|
||||
builder.setDesired(info.desired);
|
||||
|
@ -269,7 +270,7 @@ public class RestTypeMarshalling {
|
|||
IOException {
|
||||
return new ConfTreeSerDeser().fromJson(wire.getJson());
|
||||
}
|
||||
|
||||
|
||||
public static ConfTreeOperations unmarshallToCTO(Messages.WrappedJsonProto wire) throws
|
||||
IOException {
|
||||
return new ConfTreeOperations(new ConfTreeSerDeser().fromJson(wire.getJson()));
|
|
@ -192,10 +192,10 @@ import java.io.PrintStream;
|
|||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.io.Writer;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -1322,9 +1322,9 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
}
|
||||
|
||||
private void initializeOutputStream(String outFile)
|
||||
throws FileNotFoundException {
|
||||
throws IOException {
|
||||
if (outFile != null) {
|
||||
clientOutputStream = new PrintStream(new FileOutputStream(outFile));
|
||||
clientOutputStream = new PrintStream(outFile, "UTF-8");
|
||||
} else {
|
||||
clientOutputStream = System.out;
|
||||
}
|
||||
|
@ -3299,7 +3299,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
int updateCount = Integer.parseInt(updateCountStr);
|
||||
// if component was specified before, get the current count
|
||||
if (component.get(COMPONENT_INSTANCES) != null) {
|
||||
currentCount = Integer.valueOf(component.get(COMPONENT_INSTANCES));
|
||||
currentCount = Integer.parseInt(component.get(COMPONENT_INSTANCES));
|
||||
if (currentCount + updateCount < 0) {
|
||||
throw new BadCommandArgumentsException("The requested count " +
|
||||
"of \"%s\" for role %s makes the total number of " +
|
||||
|
@ -3610,16 +3610,16 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
// as this is an API entry point, validate
|
||||
// the arguments
|
||||
args.validate();
|
||||
RegistryOperations operations = getRegistryOperations();
|
||||
String path = SliderRegistryUtils.resolvePath(args.path);
|
||||
ServiceRecordMarshal serviceRecordMarshal = new ServiceRecordMarshal();
|
||||
try {
|
||||
if (args.list) {
|
||||
File destDir = args.destdir;
|
||||
if (destDir != null) {
|
||||
destDir.mkdirs();
|
||||
if (destDir != null && !destDir.exists() && !destDir.mkdirs()) {
|
||||
throw new IOException("Failed to create directory: " + destDir);
|
||||
}
|
||||
|
||||
|
||||
Map<String, ServiceRecord> recordMap;
|
||||
Map<String, RegistryPathStatus> znodes;
|
||||
try {
|
||||
|
@ -3656,9 +3656,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
} else {
|
||||
String filename = RegistryPathUtils.lastPathEntry(name) + ".json";
|
||||
File jsonFile = new File(destDir, filename);
|
||||
write(jsonFile,
|
||||
serviceRecordMarshal.toBytes(instance),
|
||||
true);
|
||||
write(jsonFile, serviceRecordMarshal.toBytes(instance));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -3669,7 +3667,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
outFile = new File(args.destdir, RegistryPathUtils.lastPathEntry(path));
|
||||
}
|
||||
if (outFile != null) {
|
||||
write(outFile, serviceRecordMarshal.toBytes(instance), true);
|
||||
write(outFile, serviceRecordMarshal.toBytes(instance));
|
||||
} else {
|
||||
println(serviceRecordMarshal.toJson(instance));
|
||||
}
|
||||
|
@ -4062,11 +4060,13 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
|
||||
private int actionKDiag(ActionKDiagArgs args)
|
||||
throws Exception {
|
||||
PrintWriter out = new PrintWriter(System.err);
|
||||
PrintStream out;
|
||||
boolean closeStream = false;
|
||||
if (args.out != null) {
|
||||
out = new PrintWriter(new FileOutputStream(args.out));
|
||||
out = new PrintStream(args.out, "UTF-8");
|
||||
closeStream = true;
|
||||
} else {
|
||||
out = System.err;
|
||||
}
|
||||
try {
|
||||
KerberosDiags kdiags = new KerberosDiags(getConfig(),
|
||||
|
@ -4137,7 +4137,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
PrintStream out = null;
|
||||
try {
|
||||
if (registryArgs.out != null) {
|
||||
out = new PrintStream(new FileOutputStream(registryArgs.out));
|
||||
out = new PrintStream(registryArgs.out, "UTF-8");
|
||||
} else {
|
||||
out = System.out;
|
||||
}
|
||||
|
@ -4145,11 +4145,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
if (!registryArgs.verbose) {
|
||||
out.println(configName);
|
||||
} else {
|
||||
PublishedConfiguration published =
|
||||
configurations.get(configName);
|
||||
out.printf("%s: %s\n",
|
||||
configName,
|
||||
published.description);
|
||||
PublishedConfiguration published = configurations.get(configName);
|
||||
out.printf("%s: %s%n", configName, published.description);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -4178,7 +4175,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
boolean streaming = false;
|
||||
try {
|
||||
if (registryArgs.out != null) {
|
||||
out = new PrintStream(new FileOutputStream(registryArgs.out));
|
||||
out = new PrintStream(registryArgs.out, "UTF-8");
|
||||
streaming = true;
|
||||
log.debug("Saving output to {}", registryArgs.out);
|
||||
} else {
|
||||
|
@ -4193,9 +4190,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
out.println(exportName);
|
||||
} else {
|
||||
PublishedExports published = exports.get(exportName);
|
||||
out.printf("%s: %s\n",
|
||||
exportName,
|
||||
published.description);
|
||||
out.printf("%s: %s%n", exportName, published.description);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -4401,9 +4396,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
* Output to standard out/stderr (implementation specific detail)
|
||||
* @param src source
|
||||
*/
|
||||
@SuppressWarnings("UseOfSystemOutOrSystemErr")
|
||||
private static void print(CharSequence src) {
|
||||
clientOutputStream.append(src);
|
||||
clientOutputStream.print(src);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4411,8 +4405,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
* @param message message
|
||||
*/
|
||||
private static void println(String message) {
|
||||
print(message);
|
||||
print("\n");
|
||||
clientOutputStream.println(message);
|
||||
}
|
||||
/**
|
||||
* Output to standard out/stderr with a newline after, formatted
|
||||
|
@ -4420,8 +4413,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
* @param args arguments for string formatting
|
||||
*/
|
||||
private static void println(String message, Object ... args) {
|
||||
print(String.format(message, args));
|
||||
print("\n");
|
||||
clientOutputStream.println(String.format(message, args));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4497,12 +4489,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
|
|||
throw new UsageException(CommonArgs.usage(serviceArgs, actionName));
|
||||
}
|
||||
|
||||
private int actionHelp(String errMsg, String actionName)
|
||||
throws YarnException, IOException {
|
||||
throw new UsageException("%s %s", errMsg, CommonArgs.usage(serviceArgs,
|
||||
actionName));
|
||||
}
|
||||
|
||||
/**
|
||||
* List the nodes in the cluster, possibly filtering by node state or label.
|
||||
*
|
||||
|
|
|
@ -75,11 +75,10 @@ public class TokensOperation {
|
|||
}
|
||||
saveTokens(output, credentials);
|
||||
String filename = output.getCanonicalPath();
|
||||
footnote = String.format("%d tokens saved to %s\n" +
|
||||
"To use these in the environment:\n" +
|
||||
"export %s=%s",
|
||||
credentials.numberOfTokens(),
|
||||
filename, UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, filename);
|
||||
footnote = String.format(
|
||||
"%d tokens saved to %s%n" + "To use these in the environment:%n"
|
||||
+ "export %s=%s", credentials.numberOfTokens(), filename,
|
||||
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, filename);
|
||||
} else if (args.source != null) {
|
||||
File source = args.source;
|
||||
log.info("Reading credentials from file {}", source);
|
||||
|
|
|
@ -51,7 +51,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.slider.api.proto.RestTypeMarshalling.*;
|
||||
import static org.apache.slider.api.types.RestTypeMarshalling.*;
|
||||
|
||||
/**
|
||||
* Cluster operations at a slightly higher level than the RPC code
|
||||
|
|
|
@ -297,20 +297,6 @@ public class SliderApplicationApiRestClient extends BaseRestClient
|
|||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(PingInformation.class, f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ping as a POST
|
||||
* @param text text to include
|
||||
* @return the response
|
||||
* @throws IOException on any failure
|
||||
*/
|
||||
public PingInformation pingPut(String text) throws IOException {
|
||||
WebResource pingResource = applicationResource(ACTION_PING);
|
||||
Form f = new Form();
|
||||
return pingResource
|
||||
.type(MediaType.TEXT_PLAIN)
|
||||
.put(PingInformation.class, text);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String text) throws IOException {
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
|
||||
package org.apache.slider.common;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Keys and various constants for Slider
|
||||
|
@ -79,8 +82,9 @@ public interface SliderKeys extends SliderXmlConfKeys {
|
|||
*/
|
||||
String COMPONENT_TYPE_EXTERNAL_APP = "external_app";
|
||||
String COMPONENT_SEPARATOR = "-";
|
||||
String[] COMPONENT_KEYS_TO_SKIP = {"zookeeper.", "env.MALLOC_ARENA_MAX",
|
||||
"site.fs.", "site.dfs."};
|
||||
List<String> COMPONENT_KEYS_TO_SKIP = Collections.unmodifiableList(Arrays
|
||||
.asList("zookeeper.", "env.MALLOC_ARENA_MAX", "site.fs.", "site.dfs."));
|
||||
|
||||
/**
|
||||
* A component type for a client component
|
||||
*/
|
||||
|
|
|
@ -142,11 +142,12 @@ public abstract class AbstractActionArgs extends ArgOps implements Arguments {
|
|||
|
||||
log.error(message);
|
||||
int index = 1;
|
||||
StringBuilder buf = new StringBuilder(message);
|
||||
for (String actionArg : parameters) {
|
||||
log.error("[{}] \"{}\"", index++, actionArg);
|
||||
message += " \"" + actionArg + "\" ";
|
||||
buf.append(" \"" + actionArg + "\" ");
|
||||
}
|
||||
throw new BadCommandArgumentsException(message);
|
||||
throw new BadCommandArgumentsException(buf.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -214,7 +214,6 @@ public abstract class AbstractClusterBuildingActionArgs extends
|
|||
protected ConfTree buildConfTree(Map<String, String> optionsMap) throws
|
||||
BadCommandArgumentsException {
|
||||
ConfTree confTree = new ConfTree();
|
||||
ConfTreeOperations ops = new ConfTreeOperations(confTree);
|
||||
confTree.global.putAll(optionsMap);
|
||||
return confTree;
|
||||
}
|
||||
|
|
|
@ -154,12 +154,12 @@ public abstract class CommonArgs extends ArgOps implements SliderActions,
|
|||
public void parse() throws SliderException {
|
||||
addActionArguments();
|
||||
try {
|
||||
commander.parse(getArgs());
|
||||
commander.parse(args);
|
||||
} catch (ParameterException e) {
|
||||
throw new BadCommandArgumentsException(e, "%s in %s",
|
||||
e.toString(),
|
||||
(getArgs() != null
|
||||
? (SliderUtils.join(getArgs(),
|
||||
(args != null
|
||||
? (SliderUtils.join(args,
|
||||
" ", false))
|
||||
: "[]"));
|
||||
}
|
||||
|
@ -297,7 +297,4 @@ public abstract class CommonArgs extends ArgOps implements SliderActions,
|
|||
return coreAction.parameters;
|
||||
}
|
||||
|
||||
public String[] getArgs() {
|
||||
return args;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,24 +29,17 @@ public class Comparators {
|
|||
public static class LongComparator implements Comparator<Long>, Serializable {
|
||||
@Override
|
||||
public int compare(Long o1, Long o2) {
|
||||
long result = o1 - o2;
|
||||
// need to comparisons with a diff greater than integer size
|
||||
if (result < 0 ) {
|
||||
return -1;
|
||||
} else if (result > 0) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
public static class InvertedLongComparator implements Comparator<Long>, Serializable {
|
||||
private static final LongComparator inner = new LongComparator();
|
||||
@Override
|
||||
public int compare(Long o1, Long o2) {
|
||||
return -inner.compare(o1, o2);
|
||||
return o1.compareTo(o2);
|
||||
}
|
||||
}
|
||||
|
||||
public static class InvertedLongComparator
|
||||
implements Comparator<Long>, Serializable {
|
||||
@Override
|
||||
public int compare(Long o1, Long o2) {
|
||||
return o2.compareTo(o1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Little template class to reverse any comparitor
|
||||
|
|
|
@ -194,7 +194,7 @@ public class ConfigHelper {
|
|||
byte[] data = loadBytes(fs, path);
|
||||
//this is here to track down a parse issue
|
||||
//related to configurations
|
||||
String s = new String(data, 0, data.length);
|
||||
String s = new String(data, 0, data.length, "UTF-8");
|
||||
log.debug("XML resource {} is \"{}\"", path, s);
|
||||
/* JDK7
|
||||
try (ByteArrayInputStream in = new ByteArrayInputStream(data)) {
|
||||
|
|
|
@ -463,37 +463,6 @@ public class CoreFileSystem {
|
|||
return isFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that a file exists in the zip file given by path
|
||||
* @param path path to zip file
|
||||
* @param file file expected to be in zip
|
||||
* @throws FileNotFoundException file not found or is not a zip file
|
||||
* @throws IOException trouble with FS
|
||||
*/
|
||||
public void verifyFileExistsInZip(Path path, String file) throws IOException {
|
||||
fileSystem.copyToLocalFile(path, new Path("/tmp"));
|
||||
File dst = new File((new Path("/tmp", path.getName())).toString());
|
||||
Enumeration<? extends ZipEntry> entries;
|
||||
ZipFile zipFile = new ZipFile(dst);
|
||||
boolean found = false;
|
||||
|
||||
try {
|
||||
entries = zipFile.entries();
|
||||
while (entries.hasMoreElements()) {
|
||||
ZipEntry entry = entries.nextElement();
|
||||
String nm = entry.getName();
|
||||
if (nm.endsWith(file)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
zipFile.close();
|
||||
}
|
||||
dst.delete();
|
||||
if (!found) throw new FileNotFoundException("file: " + file + " not found in " + path);
|
||||
log.info("Verification of " + path + " passed");
|
||||
}
|
||||
/**
|
||||
* Create the application-instance specific temporary directory
|
||||
* in the DFS
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FileUtil;
|
|||
import org.apache.hadoop.fs.GlobFilter;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -310,10 +309,6 @@ public final class SliderUtils {
|
|||
}
|
||||
String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
|
||||
Enumeration<URL> urlEnumeration = loader.getResources(class_file);
|
||||
if (urlEnumeration == null) {
|
||||
throw new IOException("Unable to find resources for class " + my_class);
|
||||
}
|
||||
|
||||
for (; urlEnumeration.hasMoreElements(); ) {
|
||||
URL url = urlEnumeration.nextElement();
|
||||
if ("jar".equals(url.getProtocol())) {
|
||||
|
@ -756,10 +751,10 @@ public final class SliderUtils {
|
|||
public static String containersToString(
|
||||
List<ContainerInformation> containers, String version,
|
||||
Set<String> components) {
|
||||
String containerf = " %-28s %30s %45s %s\n";
|
||||
String containerf = " %-28s %30s %45s %s%n";
|
||||
StringBuilder builder = new StringBuilder(512);
|
||||
builder.append("Containers:\n");
|
||||
builder.append(String.format(" %-28s %30s %45s %s\n", "Component Name",
|
||||
builder.append("Containers:%n");
|
||||
builder.append(String.format(" %-28s %30s %45s %s%n", "Component Name",
|
||||
"App Version", "Container Id", "Container Info/Logs"));
|
||||
for (ContainerInformation container : containers) {
|
||||
if (filter(container.appVersion, version)
|
||||
|
@ -969,7 +964,7 @@ public final class SliderUtils {
|
|||
*/
|
||||
public static Map<String, String> mergeMapsIgnoreDuplicateKeysAndPrefixes(
|
||||
Map<String, String> first, Map<String, String> second,
|
||||
String... prefixes) {
|
||||
List<String> prefixes) {
|
||||
Preconditions.checkArgument(first != null, "Null 'first' value");
|
||||
Preconditions.checkArgument(second != null, "Null 'second' value");
|
||||
Preconditions.checkArgument(prefixes != null, "Null 'prefixes' value");
|
||||
|
@ -2119,15 +2114,16 @@ public final class SliderUtils {
|
|||
is = new ByteArrayInputStream(content);
|
||||
} else {
|
||||
log.debug("Size unknown. Reading {}", zipEntry.getName());
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
while (true) {
|
||||
int byteRead = zis.read();
|
||||
if (byteRead == -1) {
|
||||
break;
|
||||
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
while (true) {
|
||||
int byteRead = zis.read();
|
||||
if (byteRead == -1) {
|
||||
break;
|
||||
}
|
||||
baos.write(byteRead);
|
||||
}
|
||||
baos.write(byteRead);
|
||||
is = new ByteArrayInputStream(baos.toByteArray());
|
||||
}
|
||||
is = new ByteArrayInputStream(baos.toByteArray());
|
||||
}
|
||||
done = true;
|
||||
}
|
||||
|
@ -2204,91 +2200,27 @@ public final class SliderUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Look for the windows executable and check it has the right headers.
|
||||
* <code>File.canRead()</code> doesn't work on windows, so the reading
|
||||
* is mandatory.
|
||||
*
|
||||
* @param program program name for errors
|
||||
* @param exe executable
|
||||
* @throws IOException IOE
|
||||
*/
|
||||
public static void verifyWindowsExe(String program, File exe)
|
||||
throws IOException {
|
||||
verifyIsFile(program, exe);
|
||||
|
||||
verifyFileSize(program, exe, 0x100);
|
||||
|
||||
// now read two bytes and verify the header.
|
||||
try(FileReader reader = new FileReader(exe)) {
|
||||
int[] header = new int[2];
|
||||
header[0] = reader.read();
|
||||
header[1] = reader.read();
|
||||
if ((header[0] != 'M' || header[1] != 'Z')) {
|
||||
throw new FileNotFoundException(program
|
||||
+ " at " + exe
|
||||
+ " is not a windows executable file");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that a Unix exe works
|
||||
* @param program program name for errors
|
||||
* @param exe executable
|
||||
* @throws IOException IOE
|
||||
|
||||
*/
|
||||
public static void verifyUnixExe(String program, File exe)
|
||||
throws IOException {
|
||||
verifyIsFile(program, exe);
|
||||
|
||||
// read flag
|
||||
if (!exe.canRead()) {
|
||||
throw new IOException("Cannot read " + program + " at " + exe);
|
||||
}
|
||||
// exe flag
|
||||
if (!exe.canExecute()) {
|
||||
throw new IOException("Cannot execute " + program + " at " + exe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate an executable
|
||||
* @param program program name for errors
|
||||
* @param exe program to look at
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void validateExe(String program, File exe) throws IOException {
|
||||
if (!Shell.WINDOWS) {
|
||||
verifyWindowsExe(program, exe);
|
||||
} else {
|
||||
verifyUnixExe(program, exe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write bytes to a file
|
||||
* @param outfile output file
|
||||
* @param data data to write
|
||||
* @param createParent flag to indicate that the parent dir should
|
||||
* be created
|
||||
* @throws IOException on any IO problem
|
||||
*/
|
||||
public static void write(File outfile, byte[] data, boolean createParent)
|
||||
public static void write(File outfile, byte[] data)
|
||||
throws IOException {
|
||||
File parentDir = outfile.getCanonicalFile().getParentFile();
|
||||
if (parentDir == null) {
|
||||
throw new IOException(outfile.getPath() + " has no parent dir");
|
||||
}
|
||||
if (createParent) {
|
||||
parentDir.mkdirs();
|
||||
if (!parentDir.exists()) {
|
||||
if(!parentDir.mkdirs()) {
|
||||
throw new IOException("Failed to create parent directory " + parentDir);
|
||||
}
|
||||
}
|
||||
SliderUtils.verifyIsDir(parentDir, log);
|
||||
try(FileOutputStream out = new FileOutputStream(outfile)) {
|
||||
out.write(data);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -142,7 +142,6 @@ public class InstanceBuilder {
|
|||
md.put(StatusKeys.INFO_CREATE_TIME_HUMAN, SliderUtils.toGMTString(time));
|
||||
md.put(StatusKeys.INFO_CREATE_TIME_MILLIS, Long.toString(time));
|
||||
|
||||
MapOperations globalOptions = internalOps.getGlobalOptions();
|
||||
BuildHelper.addBuildMetadata(md, "create");
|
||||
SliderUtils.setInfoTime(md,
|
||||
StatusKeys.INFO_CREATE_TIME_HUMAN,
|
||||
|
|
|
@ -163,7 +163,7 @@ public final class AggregateConf {
|
|||
public String getPassphrase() {
|
||||
if (passphrase == null) {
|
||||
passphrase = RandomStringUtils.randomAlphanumeric(
|
||||
Integer.valueOf(SliderKeys.PASS_LEN));
|
||||
Integer.parseInt(SliderKeys.PASS_LEN));
|
||||
}
|
||||
|
||||
return passphrase;
|
||||
|
|
|
@ -79,17 +79,6 @@ public final class ConfTree {
|
|||
public Map<String, Map<String, String>> components =
|
||||
new HashMap<>(INITAL_MAP_CAPACITY);
|
||||
|
||||
|
||||
/**
|
||||
* Shallow clone
|
||||
* @return a shallow clone
|
||||
* @throws CloneNotSupportedException
|
||||
*/
|
||||
@Override
|
||||
public Object clone() throws CloneNotSupportedException {
|
||||
return super.clone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
try {
|
||||
|
@ -105,9 +94,7 @@ public final class ConfTree {
|
|||
* @return a JSON string description
|
||||
* @throws IOException Problems mapping/writing the object
|
||||
*/
|
||||
public String toJson() throws IOException,
|
||||
JsonGenerationException,
|
||||
JsonMappingException {
|
||||
public String toJson() throws IOException {
|
||||
return ConfTreeSerDeser.toString(this);
|
||||
}
|
||||
|
||||
|
|
|
@ -147,7 +147,9 @@ public class AppDefinitionPersister {
|
|||
|
||||
File tempDir = Files.createTempDir();
|
||||
File pkgSrcDir = new File(tempDir, "default");
|
||||
pkgSrcDir.mkdirs();
|
||||
if (!pkgSrcDir.exists() && !pkgSrcDir.mkdirs()) {
|
||||
throw new IOException("Failed to create directory: " + pkgSrcDir);
|
||||
}
|
||||
File destMetaInfo = new File(pkgSrcDir, "metainfo.json");
|
||||
if (isFileUsed) {
|
||||
if (buildInfo.appMetaInfo.getName().endsWith(".xml")) {
|
||||
|
@ -194,12 +196,13 @@ public class AppDefinitionPersister {
|
|||
|
||||
List<String> addons = new ArrayList<String>();
|
||||
Map<String, String> addonMap = buildInfo.addonDelegate.getAddonMap();
|
||||
for (String key : addonMap.keySet()) {
|
||||
File defPath = new File(addonMap.get(key));
|
||||
if (SliderUtils.isUnset(addonMap.get(key))) {
|
||||
for (Map.Entry<String, String > entry : addonMap.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
if (SliderUtils.isUnset(value)) {
|
||||
throw new BadConfigException("Invalid path for addon package " + key);
|
||||
}
|
||||
|
||||
File defPath = new File(value);
|
||||
if (!defPath.exists()) {
|
||||
throw new BadConfigException("addon folder or package path is not valid.");
|
||||
}
|
||||
|
@ -234,7 +237,7 @@ public class AppDefinitionPersister {
|
|||
}
|
||||
|
||||
// Helper class to hold details for the app and addon packages
|
||||
public class AppDefinition {
|
||||
static class AppDefinition {
|
||||
// The target folder where the package will be stored
|
||||
public Path targetFolderInFs;
|
||||
// The on disk location of the app def package or folder
|
||||
|
|
|
@ -62,61 +62,10 @@ public class AMWebClient {
|
|||
restClient = new BaseRestClient(binding.createJerseyClient());
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static URLConnectionClientHandler getUrlConnectionClientHandler() {
|
||||
return new URLConnectionClientHandler(new HttpURLConnectionFactory() {
|
||||
@Override
|
||||
public HttpURLConnection getHttpURLConnection(URL url)
|
||||
throws IOException {
|
||||
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||
if (connection.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP) {
|
||||
// is a redirect - are we changing schemes?
|
||||
String redirectLocation = connection.getHeaderField(HttpHeaders.LOCATION);
|
||||
String originalScheme = url.getProtocol();
|
||||
String redirectScheme = URI.create(redirectLocation).getScheme();
|
||||
if (!originalScheme.equals(redirectScheme)) {
|
||||
// need to fake it out by doing redirect ourselves
|
||||
log.info("Protocol change during redirect. Redirecting {} to URL {}",
|
||||
url, redirectLocation);
|
||||
URL redirectURL = new URL(redirectLocation);
|
||||
connection = (HttpURLConnection) redirectURL.openConnection();
|
||||
}
|
||||
}
|
||||
if (connection instanceof HttpsURLConnection) {
|
||||
log.debug("Attempting to configure HTTPS connection using client "
|
||||
+ "configuration");
|
||||
final SSLFactory factory;
|
||||
final SSLSocketFactory sf;
|
||||
final HostnameVerifier hv;
|
||||
|
||||
try {
|
||||
HttpsURLConnection c = (HttpsURLConnection) connection;
|
||||
factory = new SSLFactory(SSLFactory.Mode.CLIENT, new Configuration());
|
||||
factory.init();
|
||||
sf = factory.createSSLSocketFactory();
|
||||
hv = factory.getHostnameVerifier();
|
||||
c.setSSLSocketFactory(sf);
|
||||
c.setHostnameVerifier(hv);
|
||||
} catch (Exception e) {
|
||||
log.info("Unable to configure HTTPS connection from "
|
||||
+ "configuration. Using JDK properties.");
|
||||
}
|
||||
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public WebResource resource(String url) {
|
||||
return restClient.resource(url);
|
||||
}
|
||||
|
||||
public BaseRestClient getRestClient() {
|
||||
return restClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the operation. Failures are raised as IOException subclasses
|
||||
* @param method method to execute
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.slider.core.restclient;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A response for use as a return value from operations
|
||||
*/
|
||||
public class HttpOperationResponse {
|
||||
|
||||
public int responseCode;
|
||||
public long lastModified;
|
||||
public String contentType;
|
||||
public byte[] data;
|
||||
public Map<String, List<String>> headers;
|
||||
}
|
|
@ -87,124 +87,4 @@ public class UrlConnectionOperations extends Configured {
|
|||
Preconditions.checkArgument(url.getPort() != 0, "no port");
|
||||
return (HttpURLConnection) connectionFactory.openConnection(url, useSpnego);
|
||||
}
|
||||
|
||||
public HttpOperationResponse execGet(URL url) throws
|
||||
IOException,
|
||||
AuthenticationException {
|
||||
return execHttpOperation(HttpVerb.GET, url, null, "");
|
||||
}
|
||||
|
||||
public HttpOperationResponse execHttpOperation(HttpVerb verb,
|
||||
URL url,
|
||||
byte[] payload,
|
||||
String contentType)
|
||||
throws IOException, AuthenticationException {
|
||||
HttpURLConnection conn = null;
|
||||
HttpOperationResponse outcome = new HttpOperationResponse();
|
||||
int resultCode;
|
||||
byte[] body = null;
|
||||
log.debug("{} {} spnego={}", verb, url, useSpnego);
|
||||
|
||||
boolean doOutput = verb.hasUploadBody();
|
||||
if (doOutput) {
|
||||
Preconditions.checkArgument(payload !=null,
|
||||
"Null payload on a verb which expects one");
|
||||
}
|
||||
try {
|
||||
conn = openConnection(url);
|
||||
conn.setRequestMethod(verb.getVerb());
|
||||
conn.setDoOutput(doOutput);
|
||||
if (doOutput) {
|
||||
conn.setRequestProperty("Content-Type", contentType);
|
||||
}
|
||||
|
||||
// now do the connection
|
||||
conn.connect();
|
||||
|
||||
if (doOutput) {
|
||||
OutputStream output = conn.getOutputStream();
|
||||
IOUtils.write(payload, output);
|
||||
output.close();
|
||||
}
|
||||
|
||||
resultCode = conn.getResponseCode();
|
||||
outcome.lastModified = conn.getLastModified();
|
||||
outcome.contentType = conn.getContentType();
|
||||
outcome.headers = conn.getHeaderFields();
|
||||
InputStream stream = conn.getErrorStream();
|
||||
if (stream == null) {
|
||||
stream = conn.getInputStream();
|
||||
}
|
||||
if (stream != null) {
|
||||
// read into a buffer.
|
||||
body = IOUtils.toByteArray(stream);
|
||||
} else {
|
||||
// no body:
|
||||
log.debug("No body in response");
|
||||
|
||||
}
|
||||
} catch (SSLException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
throw NetUtils.wrapException(url.toString(),
|
||||
url.getPort(), "localhost", 0, e);
|
||||
|
||||
} catch (AuthenticationException e) {
|
||||
throw new AuthenticationException("From " + url + ": " + e, e);
|
||||
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
conn.disconnect();
|
||||
}
|
||||
}
|
||||
uprateFaults(HttpVerb.GET, url.toString(), resultCode, "", body);
|
||||
outcome.responseCode = resultCode;
|
||||
outcome.data = body;
|
||||
return outcome;
|
||||
}
|
||||
|
||||
/**
|
||||
* Uprate error codes 400 and up into faults;
|
||||
* 404 is converted to a {@link NotFoundException},
|
||||
* 401 to {@link ForbiddenException}
|
||||
*
|
||||
* @param verb HTTP Verb used
|
||||
* @param url URL as string
|
||||
* @param resultCode response from the request
|
||||
* @param bodyAsString
|
||||
*@param body optional body of the request @throws IOException if the result was considered a failure
|
||||
*/
|
||||
public static void uprateFaults(HttpVerb verb, String url,
|
||||
int resultCode, String bodyAsString, byte[] body)
|
||||
throws IOException {
|
||||
|
||||
if (resultCode < 400) {
|
||||
//success
|
||||
return;
|
||||
}
|
||||
String msg = verb.toString() +" "+ url;
|
||||
if (resultCode == 404) {
|
||||
throw new NotFoundException(msg);
|
||||
}
|
||||
if (resultCode == 401) {
|
||||
throw new ForbiddenException(msg);
|
||||
}
|
||||
// all other error codes
|
||||
|
||||
// get a string respnse
|
||||
if (bodyAsString == null) {
|
||||
if (body != null && body.length > 0) {
|
||||
bodyAsString = new String(body);
|
||||
} else {
|
||||
bodyAsString = "";
|
||||
}
|
||||
}
|
||||
String message = msg +
|
||||
" failed with exit code " + resultCode
|
||||
+ ", body length " + bodyAsString.length()
|
||||
+ ":\n" + bodyAsString;
|
||||
log.error(message);
|
||||
throw new IOException(message);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -137,8 +137,6 @@ public class MiniZooKeeperCluster extends AbstractService {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param baseDir
|
||||
* @param numZooKeeperServers
|
||||
* @return ClientPort server bound to, -1 if there was a
|
||||
* binding problem and we couldn't pick another port.
|
||||
* @throws IOException
|
||||
|
@ -229,17 +227,6 @@ public class MiniZooKeeperCluster extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the basedir
|
||||
*/
|
||||
private void deleteBaseDir() {
|
||||
if (baseDir != null) {
|
||||
baseDir.delete();
|
||||
baseDir = null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
|
||||
|
@ -359,7 +346,7 @@ public class MiniZooKeeperCluster extends AbstractService {
|
|||
try {
|
||||
sock = new Socket("localhost", port);
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.write("stat".getBytes("UTF-8"));
|
||||
outstream.flush();
|
||||
} finally {
|
||||
IOUtils.closeSocket(sock);
|
||||
|
@ -387,10 +374,10 @@ public class MiniZooKeeperCluster extends AbstractService {
|
|||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.write("stat".getBytes("UTF-8"));
|
||||
outstream.flush();
|
||||
|
||||
Reader isr = new InputStreamReader(sock.getInputStream());
|
||||
Reader isr = new InputStreamReader(sock.getInputStream(), "UTF-8");
|
||||
reader = new BufferedReader(isr);
|
||||
String line = reader.readLine();
|
||||
if (line != null && line.startsWith("Zookeeper version:")) {
|
||||
|
@ -412,12 +399,4 @@ public class MiniZooKeeperCluster extends AbstractService {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public int getClientPort() {
|
||||
return clientPort;
|
||||
}
|
||||
|
||||
public String getZkQuorum() {
|
||||
return zkQuorum;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,14 +43,14 @@ public class ZKIntegration implements Watcher, Closeable {
|
|||
/**
|
||||
* Base path for services
|
||||
*/
|
||||
public static String ZK_SERVICES = "services";
|
||||
public static final String ZK_SERVICES = "services";
|
||||
/**
|
||||
* Base path for all Slider references
|
||||
*/
|
||||
public static String ZK_SLIDER = "slider";
|
||||
public static String ZK_USERS = "users";
|
||||
public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
|
||||
public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
|
||||
public static final String ZK_SLIDER = "slider";
|
||||
public static final String ZK_USERS = "users";
|
||||
public static final String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
|
||||
public static final String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
|
||||
|
||||
public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<String>();
|
||||
static {
|
||||
|
@ -59,7 +59,7 @@ public class ZKIntegration implements Watcher, Closeable {
|
|||
ZK_USERS_PATH_LIST.add(ZK_USERS);
|
||||
}
|
||||
|
||||
public static int SESSION_TIMEOUT = 30000;
|
||||
public static final int SESSION_TIMEOUT = 30000;
|
||||
protected static final Logger log =
|
||||
LoggerFactory.getLogger(ZKIntegration.class);
|
||||
private ZooKeeper zookeeper;
|
||||
|
@ -279,14 +279,6 @@ public class ZKIntegration implements Watcher, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking enum of users
|
||||
* @return an unordered list of clusters under a user
|
||||
*/
|
||||
public List<String> getClusters() throws KeeperException, InterruptedException {
|
||||
return zookeeper.getChildren(userPath, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a node, does not throw an exception if the path is not fond
|
||||
* @param path path to delete
|
||||
|
|
|
@ -735,11 +735,11 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
|
|||
Map<String, Map<String, String>> configurations) {
|
||||
Map<String, String> allConfigs = new HashMap<>();
|
||||
String lookupFormat = "${@//site/%s/%s}";
|
||||
for (String configType : configurations.keySet()) {
|
||||
Map<String, String> configBucket = configurations.get(configType);
|
||||
for (String configName : configBucket.keySet()) {
|
||||
allConfigs.put(String.format(lookupFormat, configType, configName),
|
||||
configBucket.get(configName));
|
||||
for (Map.Entry<String, Map<String, String>> entry : configurations.entrySet()) {
|
||||
Map<String, String> configBucket = entry.getValue();
|
||||
for(Map.Entry<String, String> config: configBucket.entrySet()) {
|
||||
allConfigs.put(String.format(lookupFormat, entry.getKey(), config.getKey()),
|
||||
config.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -758,15 +758,15 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
|
|||
configValue = configValue.replace(lookUpKey, lookUpValue);
|
||||
}
|
||||
}
|
||||
if (!configValue.equals(entry.getValue())) {
|
||||
if (configValue != null && !configValue.equals(entry.getValue())) {
|
||||
finished = false;
|
||||
allConfigs.put(entry.getKey(), configValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (String configType : configurations.keySet()) {
|
||||
Map<String, String> configBucket = configurations.get(configType);
|
||||
for (Map.Entry<String, Map<String, String>> configEntry : configurations
|
||||
.entrySet()) {
|
||||
Map<String, String> configBucket = configEntry.getValue();
|
||||
for (Map.Entry<String, String> entry: configBucket.entrySet()) {
|
||||
String configName = entry.getKey();
|
||||
String configValue = entry.getValue();
|
||||
|
@ -817,7 +817,8 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
|
|||
*/
|
||||
private void addConfsToList(Map<String, String> confMap,
|
||||
Set<String> confList, String prefix, String suffix) {
|
||||
for (String key : confMap.keySet()) {
|
||||
for (Entry<String, String> entry : confMap.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
if (key.startsWith(prefix) && key.endsWith(suffix)) {
|
||||
String confName = key.substring(prefix.length(),
|
||||
key.length() - suffix.length());
|
||||
|
|
|
@ -257,19 +257,20 @@ public class DockerProviderService extends AbstractProviderService implements
|
|||
.getInternalsSnapshot(), null, getClusterName(), clientName,
|
||||
clientName, getAmState());
|
||||
|
||||
for (String configFileDN : configurations.keySet()) {
|
||||
for (Map.Entry<String, Map<String, String>> entry : configurations.entrySet()) {
|
||||
String configFileDN = entry.getKey();
|
||||
String configFileName = appConf.getComponentOpt(clientName,
|
||||
OptionKeys.CONF_FILE_PREFIX + configFileDN + OptionKeys
|
||||
.NAME_SUFFIX, null);
|
||||
String configFileType = appConf.getComponentOpt(clientName,
|
||||
OptionKeys.CONF_FILE_PREFIX + configFileDN + OptionKeys
|
||||
.TYPE_SUFFIX, null);
|
||||
if (configFileName == null && configFileType == null) {
|
||||
if (configFileName == null || configFileType == null) {
|
||||
continue;
|
||||
}
|
||||
ConfigFormat configFormat = ConfigFormat.resolve(configFileType);
|
||||
|
||||
Map<String, String> config = configurations.get(configFileDN);
|
||||
Map<String, String> config = entry.getValue();
|
||||
ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
|
||||
fileSystem, getClusterName(),
|
||||
new File(configFileName).getName());
|
||||
|
@ -365,9 +366,10 @@ public class DockerProviderService extends AbstractProviderService implements
|
|||
for (Entry<String, String> export : exports.entrySet()) {
|
||||
String value = export.getValue();
|
||||
// replace host names and site properties
|
||||
for (String token : replaceTokens.keySet()) {
|
||||
for (Map.Entry<String, String> entry : replaceTokens.entrySet()) {
|
||||
String token = entry.getKey();
|
||||
if (value.contains(token)) {
|
||||
value = value.replaceAll(Pattern.quote(token), replaceTokens.get(token));
|
||||
value = value.replaceAll(Pattern.quote(token), entry.getValue());
|
||||
}
|
||||
}
|
||||
ExportEntry entry = new ExportEntry();
|
||||
|
|
|
@ -47,21 +47,6 @@ public class BoolMetric implements Metric, Gauge<Integer> {
|
|||
return value.get() ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate from a string. Returns true if the string is considered to match 'true',
|
||||
* false otherwise.
|
||||
* @param s source
|
||||
* @return true if the input parses to an integer other than 0. False if it doesn't parse
|
||||
* or parses to 0.
|
||||
*/
|
||||
public static boolean fromString(String s) {
|
||||
try {
|
||||
return Integer.valueOf(s) != 0;
|
||||
} catch (NumberFormatException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return value.toString();
|
||||
|
|
|
@ -58,11 +58,6 @@ public class MetricsAndMonitoring extends CompositeService {
|
|||
|
||||
private final List<MetricSet> metricSets = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* List of recorded events
|
||||
*/
|
||||
private final List<RecordedEvent> eventHistory = new ArrayList<>(100);
|
||||
|
||||
public static final int EVENT_LIMIT = 1000;
|
||||
|
||||
public MetricRegistry getMetrics() {
|
||||
|
@ -139,26 +134,6 @@ public class MetricsAndMonitoring extends CompositeService {
|
|||
return register(MetricRegistry.name(klass, names), metric);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add an event (synchronized)
|
||||
* @param event event
|
||||
*/
|
||||
public synchronized void noteEvent(RecordedEvent event) {
|
||||
if (eventHistory.size() > EVENT_LIMIT) {
|
||||
eventHistory.remove(0);
|
||||
}
|
||||
eventHistory.add(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone the event history; blocks for the duration of the copy operation.
|
||||
* @return a new list
|
||||
*/
|
||||
public synchronized List<RecordedEvent> cloneEventHistory() {
|
||||
return new ArrayList<>(eventHistory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a metric set for registering and deregistration on service stop
|
||||
* @param metricSet metric set
|
||||
|
|
|
@ -1,85 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.slider.server.appmaster.management;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Counting;
|
||||
import com.codahale.metrics.Metric;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* This is a counter whose range can be given a min and a max
|
||||
*/
|
||||
public class RangeLimitedCounter implements Metric, Counting {
|
||||
|
||||
private final AtomicLong value;
|
||||
private final long min, max;
|
||||
|
||||
/**
|
||||
* Instantiate
|
||||
* @param val current value
|
||||
* @param min minimum value
|
||||
* @param max max value (or 0 for no max)
|
||||
*/
|
||||
public RangeLimitedCounter(long val, long min, long max) {
|
||||
this.value = new AtomicLong(val);
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set to a new value. If below the min, set to the minimum. If the max is non
|
||||
* zero and the value is above that maximum, set it to the maximum instead.
|
||||
* @param val value
|
||||
*/
|
||||
public synchronized void set(long val) {
|
||||
if (val < min) {
|
||||
val = min;
|
||||
} else if (max > 0 && val > max) {
|
||||
val = max;
|
||||
}
|
||||
value.set(val);
|
||||
}
|
||||
|
||||
public void inc() {
|
||||
inc(1);
|
||||
}
|
||||
|
||||
public void dec() {
|
||||
dec(1);
|
||||
}
|
||||
|
||||
public synchronized void inc(int delta) {
|
||||
set(value.get() + delta);
|
||||
}
|
||||
|
||||
public synchronized void dec(int delta) {
|
||||
set(value.get() - delta);
|
||||
}
|
||||
|
||||
public long get() {
|
||||
return value.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCount() {
|
||||
return value.get();
|
||||
}
|
||||
}
|
|
@ -1,58 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.slider.server.appmaster.management;
|
||||
|
||||
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||
import org.codehaus.jackson.map.annotate.JsonSerialize;
|
||||
|
||||
import java.text.DateFormat;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
|
||||
public class RecordedEvent {
|
||||
private static final DateFormat dateFormat = DateFormat.getDateInstance();
|
||||
public long id;
|
||||
public String name;
|
||||
public long timestamp;
|
||||
public String time;
|
||||
public String category;
|
||||
public String host;
|
||||
public int role;
|
||||
public String text;
|
||||
|
||||
public RecordedEvent() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an event. The timestamp is also converted to a time string
|
||||
* @param id id counter
|
||||
* @param name event name
|
||||
* @param timestamp timestamp. If non-zero, is used to build the {@code time} text field.
|
||||
* @param category even category
|
||||
* @param text arbitrary text
|
||||
*/
|
||||
public RecordedEvent(long id, String name, long timestamp, String category, String text) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
this.timestamp = timestamp;
|
||||
this.time = timestamp > 0 ? dateFormat.format(timestamp) : "";
|
||||
this.category = category;
|
||||
this.text = text;
|
||||
}
|
||||
}
|
|
@ -59,7 +59,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.slider.api.proto.RestTypeMarshalling.marshall;
|
||||
import static org.apache.slider.api.types.RestTypeMarshalling.marshall;
|
||||
import static org.apache.slider.server.appmaster.web.rest.RestPaths.*;
|
||||
|
||||
/**
|
||||
|
|
|
@ -253,7 +253,7 @@ public class NodeInstance {
|
|||
new StringBuilder(toString());
|
||||
sb.append("{ ");
|
||||
for (NodeEntry entry : nodeEntries) {
|
||||
sb.append(String.format("\n [%02d] ", entry.rolePriority));
|
||||
sb.append(String.format("%n [%02d] ", entry.rolePriority));
|
||||
sb.append(entry.toString());
|
||||
}
|
||||
sb.append("} ");
|
||||
|
|
|
@ -232,7 +232,7 @@ public class OutstandingRequestTracker {
|
|||
* the most recent one is picked first. This operation <i>does not</i>
|
||||
* change the role history, though it queries it.
|
||||
*/
|
||||
static class newerThan implements Comparator<Container>, Serializable {
|
||||
static class newerThan implements Comparator<Container> {
|
||||
private RoleHistory rh;
|
||||
|
||||
public newerThan(RoleHistory rh) {
|
||||
|
|
|
@ -321,10 +321,6 @@ public final class RoleStatus implements Cloneable, MetricSet {
|
|||
return completed.get();
|
||||
}
|
||||
|
||||
public synchronized void setCompleted(int completed) {
|
||||
this.completed.set(completed);
|
||||
}
|
||||
|
||||
public long incCompleted() {
|
||||
return completed.incrementAndGet();
|
||||
}
|
||||
|
|
|
@ -69,7 +69,6 @@ public class InsecureAmFilter extends AmIpFilter {
|
|||
FilterChain chain) throws IOException, ServletException {
|
||||
rejectNonHttpRequests(req);
|
||||
HttpServletRequest httpReq = (HttpServletRequest) req;
|
||||
HttpServletResponse httpResp = (HttpServletResponse) resp;
|
||||
|
||||
|
||||
String requestURI = httpReq.getRequestURI();
|
||||
|
|
|
@ -48,6 +48,7 @@ import java.net.URLClassLoader;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -122,9 +123,9 @@ public class PublisherResource extends AbstractSliderResource {
|
|||
@GET
|
||||
@Path(CLASSPATH)
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Set<URL> getAMClassPath() {
|
||||
public List<URL> getAMClassPath() {
|
||||
URL[] urls = ((URLClassLoader) getClass().getClassLoader()).getURLs();
|
||||
return new LinkedHashSet<URL>(Arrays.asList(urls));
|
||||
return Arrays.asList(urls);
|
||||
}
|
||||
|
||||
@GET
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.slider.api.types.ComponentInformation;
|
|||
import org.apache.slider.server.appmaster.state.RoleInstance;
|
||||
import org.apache.slider.server.appmaster.web.WebAppApi;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -163,7 +164,7 @@ public class ContainerStatsBlock extends SliderHamletBlock {
|
|||
protected static <T> Function<Entry<String,T>,Entry<TableContent,T>> toTableContentFunction() {
|
||||
return new Function<Entry<String,T>,Entry<TableContent,T>>() {
|
||||
@Override
|
||||
public Entry<TableContent,T> apply(Entry<String,T> input) {
|
||||
public Entry<TableContent,T> apply(@Nonnull Entry<String,T> input) {
|
||||
return Maps.immutableEntry(new TableContent(input.getKey()), input.getValue());
|
||||
}
|
||||
};
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Date;
|
|||
* Had better be unserializable at the far end if that is to work.
|
||||
*/
|
||||
public final class ProbeStatus implements Serializable {
|
||||
private static final long serialVersionUID = 165468L;
|
||||
|
||||
private long timestamp;
|
||||
private String timestampText;
|
||||
|
|
|
@ -41,7 +41,6 @@ public abstract class AbstractSliderLaunchedService extends
|
|||
protected AbstractSliderLaunchedService(String name) {
|
||||
super(name);
|
||||
// make sure all the yarn configs get loaded
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
ConfigHelper.registerDeprecatedConfigItems();
|
||||
}
|
||||
|
||||
|
@ -74,7 +73,6 @@ public abstract class AbstractSliderLaunchedService extends
|
|||
throws BadConfigException {
|
||||
|
||||
// push back the slider registry entry if needed
|
||||
String quorum = lookupZKQuorum();
|
||||
RegistryOperations registryWriterService =
|
||||
createRegistryOperationsInstance();
|
||||
deployChildService(registryWriterService);
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.BufferedReader;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
|
@ -527,9 +528,9 @@ public class LongLivedProcess implements Runnable {
|
|||
StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
|
||||
try {
|
||||
errReader = new BufferedReader(
|
||||
new InputStreamReader(process.getErrorStream()));
|
||||
new InputStreamReader(process.getErrorStream(), "UTF-8"));
|
||||
outReader = new BufferedReader(
|
||||
new InputStreamReader(process.getInputStream()));
|
||||
new InputStreamReader(process.getInputStream(), "UTF-8"));
|
||||
while (!finished.get()) {
|
||||
boolean processed = false;
|
||||
if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
|
||||
|
|
|
@ -153,7 +153,7 @@ public class TestSliderUtils {
|
|||
@Test
|
||||
public void testWrite() throws IOException {
|
||||
File testWriteFile = folder.newFile("testWrite");
|
||||
SliderUtils.write(testWriteFile, "test".getBytes("UTF-8"), true);
|
||||
SliderUtils.write(testWriteFile, "test".getBytes("UTF-8"));
|
||||
Assert.assertTrue(FileUtils.readFileToString(testWriteFile, "UTF-8").equals("test"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,395 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.slider.test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.zookeeper.server.NIOServerCnxnFactory;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnLog;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.net.BindException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
|
||||
/**
|
||||
* This is a version of the HBase ZK cluster cut out to be standalone
|
||||
*/
|
||||
public class MiniZooKeeperCluster {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
MiniZooKeeperCluster.class);
|
||||
|
||||
private static final int TICK_TIME = 2000;
|
||||
private static final int CONNECTION_TIMEOUT = 30000;
|
||||
public static final int MAX_CLIENT_CONNECTIONS = 1000;
|
||||
|
||||
private boolean started;
|
||||
|
||||
/** The default port. If zero, we use a random port. */
|
||||
private int defaultClientPort = 0;
|
||||
|
||||
private int clientPort;
|
||||
|
||||
private List<NIOServerCnxnFactory> standaloneServerFactoryList;
|
||||
private List<ZooKeeperServer> zooKeeperServers;
|
||||
private List<Integer> clientPortList;
|
||||
|
||||
private int activeZKServerIndex;
|
||||
private int tickTime = 0;
|
||||
|
||||
private Configuration configuration;
|
||||
|
||||
public MiniZooKeeperCluster() {
|
||||
this(new Configuration());
|
||||
}
|
||||
|
||||
public MiniZooKeeperCluster(Configuration configuration) {
|
||||
this.started = false;
|
||||
this.configuration = configuration;
|
||||
activeZKServerIndex = -1;
|
||||
zooKeeperServers = new ArrayList<ZooKeeperServer>();
|
||||
clientPortList = new ArrayList<Integer>();
|
||||
standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
|
||||
}
|
||||
|
||||
public void setDefaultClientPort(int clientPort) {
|
||||
if (clientPort <= 0) {
|
||||
throw new IllegalArgumentException("Invalid default ZK client port: "
|
||||
+ clientPort);
|
||||
}
|
||||
this.defaultClientPort = clientPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Selects a ZK client port. Returns the default port if specified.
|
||||
* Otherwise, returns a random port. The random port is selected from the
|
||||
* range between 49152 to 65535. These ports cannot be registered with IANA
|
||||
* and are intended for dynamic allocation (see http://bit.ly/dynports).
|
||||
*/
|
||||
private int selectClientPort() {
|
||||
if (defaultClientPort > 0) {
|
||||
return defaultClientPort;
|
||||
}
|
||||
return 0xc000 + new Random().nextInt(0x3f00);
|
||||
}
|
||||
|
||||
public void setTickTime(int tickTime) {
|
||||
this.tickTime = tickTime;
|
||||
}
|
||||
|
||||
public int getBackupZooKeeperServerNum() {
|
||||
return zooKeeperServers.size() - 1;
|
||||
}
|
||||
|
||||
public int getZooKeeperServerNum() {
|
||||
return zooKeeperServers.size();
|
||||
}
|
||||
|
||||
// / XXX: From o.a.zk.t.ClientBase
|
||||
private static void setupTestEnv() {
|
||||
// during the tests we run with 100K prealloc in the logs.
|
||||
// on windows systems prealloc of 64M was seen to take ~15seconds
|
||||
// resulting in test failure (client timeout on first session).
|
||||
// set env and directly in order to handle static init/gc issues
|
||||
System.setProperty("zookeeper.preAllocSize", "100");
|
||||
FileTxnLog.setPreallocSize(100 * 1024);
|
||||
}
|
||||
|
||||
public int startup(File baseDir) throws IOException, InterruptedException {
|
||||
return startup(baseDir, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param baseDir
|
||||
* @param numZooKeeperServers
|
||||
* @return ClientPort server bound to, -1 if there was a
|
||||
* binding problem and we couldn't pick another port.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public int startup(File baseDir, int numZooKeeperServers) throws IOException,
|
||||
InterruptedException {
|
||||
if (numZooKeeperServers <= 0)
|
||||
return -1;
|
||||
|
||||
setupTestEnv();
|
||||
shutdown();
|
||||
|
||||
int tentativePort = selectClientPort();
|
||||
|
||||
// running all the ZK servers
|
||||
for (int i = 0; i < numZooKeeperServers; i++) {
|
||||
File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
|
||||
recreateDir(dir);
|
||||
int tickTimeToUse;
|
||||
if (this.tickTime > 0) {
|
||||
tickTimeToUse = this.tickTime;
|
||||
} else {
|
||||
tickTimeToUse = TICK_TIME;
|
||||
}
|
||||
ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
|
||||
NIOServerCnxnFactory standaloneServerFactory;
|
||||
while (true) {
|
||||
try {
|
||||
standaloneServerFactory = new NIOServerCnxnFactory();
|
||||
standaloneServerFactory.configure(
|
||||
new InetSocketAddress(tentativePort),
|
||||
MAX_CLIENT_CONNECTIONS
|
||||
);
|
||||
} catch (BindException e) {
|
||||
LOG.debug("Failed binding ZK Server to client port: " +
|
||||
tentativePort, e);
|
||||
// We're told to use some port but it's occupied, fail
|
||||
if (defaultClientPort > 0) return -1;
|
||||
// This port is already in use, try to use another.
|
||||
tentativePort = selectClientPort();
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Start up this ZK server
|
||||
standaloneServerFactory.startup(server);
|
||||
if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of standalone server");
|
||||
}
|
||||
|
||||
// We have selected this port as a client port.
|
||||
clientPortList.add(tentativePort);
|
||||
standaloneServerFactoryList.add(standaloneServerFactory);
|
||||
zooKeeperServers.add(server);
|
||||
tentativePort++; //for the next server
|
||||
}
|
||||
|
||||
// set the first one to be active ZK; Others are backups
|
||||
activeZKServerIndex = 0;
|
||||
started = true;
|
||||
clientPort = clientPortList.get(activeZKServerIndex);
|
||||
LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
|
||||
"on client port: " + clientPort);
|
||||
return clientPort;
|
||||
}
|
||||
|
||||
private void recreateDir(File dir) throws IOException {
|
||||
if (dir.exists()) {
|
||||
if (!FileUtil.fullyDelete(dir)) {
|
||||
throw new IOException("Could not delete zk base directory: " + dir);
|
||||
}
|
||||
}
|
||||
try {
|
||||
dir.mkdirs();
|
||||
} catch (SecurityException e) {
|
||||
throw new IOException("creating dir: " + dir, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
*/
|
||||
public void shutdown() throws IOException {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
// shut down all the zk servers
|
||||
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
|
||||
NIOServerCnxnFactory standaloneServerFactory =
|
||||
standaloneServerFactoryList.get(i);
|
||||
int clientPort = clientPortList.get(i);
|
||||
|
||||
standaloneServerFactory.shutdown();
|
||||
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of standalone server");
|
||||
}
|
||||
}
|
||||
for (ZooKeeperServer zkServer : zooKeeperServers) {
|
||||
//explicitly close ZKDatabase since ZookeeperServer does not close them
|
||||
zkServer.getZKDatabase().close();
|
||||
}
|
||||
|
||||
// clear everything
|
||||
started = false;
|
||||
activeZKServerIndex = 0;
|
||||
standaloneServerFactoryList.clear();
|
||||
clientPortList.clear();
|
||||
zooKeeperServers.clear();
|
||||
|
||||
LOG.info("Shutdown MiniZK cluster with all ZK servers");
|
||||
}
|
||||
|
||||
/**@return clientPort return clientPort if there is another ZK backup can run
|
||||
* when killing the current active; return -1, if there is no backups.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public int killCurrentActiveZooKeeperServer() throws IOException,
|
||||
InterruptedException {
|
||||
if (!started || activeZKServerIndex < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Shutdown the current active one
|
||||
NIOServerCnxnFactory standaloneServerFactory =
|
||||
standaloneServerFactoryList.get(activeZKServerIndex);
|
||||
int clientPort = clientPortList.get(activeZKServerIndex);
|
||||
|
||||
standaloneServerFactory.shutdown();
|
||||
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of standalone server");
|
||||
}
|
||||
|
||||
zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
|
||||
|
||||
// remove the current active zk server
|
||||
standaloneServerFactoryList.remove(activeZKServerIndex);
|
||||
clientPortList.remove(activeZKServerIndex);
|
||||
zooKeeperServers.remove(activeZKServerIndex);
|
||||
LOG.info("Kill the current active ZK servers in the cluster " +
|
||||
"on client port: " + clientPort);
|
||||
|
||||
if (standaloneServerFactoryList.size() == 0) {
|
||||
// there is no backup servers;
|
||||
return -1;
|
||||
}
|
||||
clientPort = clientPortList.get(activeZKServerIndex);
|
||||
LOG.info("Activate a backup zk server in the cluster " +
|
||||
"on client port: " + clientPort);
|
||||
// return the next back zk server's port
|
||||
return clientPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill one back up ZK servers
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void killOneBackupZooKeeperServer() throws IOException,
|
||||
InterruptedException {
|
||||
if (!started || activeZKServerIndex < 0 ||
|
||||
standaloneServerFactoryList.size() <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
int backupZKServerIndex = activeZKServerIndex + 1;
|
||||
// Shutdown the current active one
|
||||
NIOServerCnxnFactory standaloneServerFactory =
|
||||
standaloneServerFactoryList.get(backupZKServerIndex);
|
||||
int clientPort = clientPortList.get(backupZKServerIndex);
|
||||
|
||||
standaloneServerFactory.shutdown();
|
||||
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of standalone server");
|
||||
}
|
||||
|
||||
zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
|
||||
|
||||
// remove this backup zk server
|
||||
standaloneServerFactoryList.remove(backupZKServerIndex);
|
||||
clientPortList.remove(backupZKServerIndex);
|
||||
zooKeeperServers.remove(backupZKServerIndex);
|
||||
LOG.info("Kill one backup ZK servers in the cluster " +
|
||||
"on client port: " + clientPort);
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
private static boolean waitForServerDown(int port, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket("localhost", port);
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.flush();
|
||||
} finally {
|
||||
sock.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
private static boolean waitForServerUp(int port, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket("localhost", port);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.flush();
|
||||
|
||||
Reader isr = new InputStreamReader(sock.getInputStream());
|
||||
reader = new BufferedReader(isr);
|
||||
String line = reader.readLine();
|
||||
if (line != null && line.startsWith("Zookeeper version:")) {
|
||||
return true;
|
||||
}
|
||||
} finally {
|
||||
sock.close();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
LOG.info("server localhost:" + port + " not up " + e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public int getClientPort() {
|
||||
return clientPort;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue