HDFS-12772. RBF: Federation Router State State Store internal API. Contributed by Inigo Goiri.

This commit is contained in:
Inigo Goiri 2018-01-23 19:15:44 -08:00
parent d95c13774e
commit 95743c672e
23 changed files with 1644 additions and 0 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.util.VersionInfo;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@ -105,6 +106,25 @@ public final class FederationUtil {
return ret;
}
/**
* Fetch the compile timestamp for this jar.
*
* @return Date compiled.
*/
public static String getBuildVersion() {
return VersionInfo.getBuildVersion();
}
/**
* Fetch the build/compile information for this jar.
*
* @return String Compilation info.
*/
public static String getCompileInfo() {
return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
+ VersionInfo.getBranch();
}
/**
* Create an instance of an interface with a constructor using a context.
*

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
/**
* States of the Router.
*/
public enum RouterServiceState {
NONE,
INITIALIZING,
SAFEMODE,
RUNNING,
STOPPING,
SHUTDOWN,
EXPIRED;
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
/**
* Management API for
* {@link org.apache.hadoop.hdfs.server.federation.store.records.RouterState
* RouterState} records in the state store. Accesses the data store via the
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.
* StateStoreDriver StateStoreDriver} interface. No data is cached.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RouterStore extends CachedRecordStore<RouterState> {
public RouterStore(StateStoreDriver driver) {
super(RouterState.class, driver, true);
}
/**
* Fetches the current router state object.
*
* @param request Fully populated request object.
* @return The matching router record or null if none exists.
* @throws IOException Throws exception if unable to query the data store or
* if more than one matching record is found.
*/
public abstract GetRouterRegistrationResponse getRouterRegistration(
GetRouterRegistrationRequest request) throws IOException;
/**
* Fetches all router status objects.
*
* @param request Fully populated request object.
* @return List of Router records present in the data store.
* @throws IOException Throws exception if unable to query the data store
*/
public abstract GetRouterRegistrationsResponse getRouterRegistrations(
GetRouterRegistrationsRequest request) throws IOException;
/**
* Update the state of this router in the State Store.
*
* @param request Fully populated request object.
* @return True if the update was successfully recorded, false otherwise.
* @throws IOException Throws exception if unable to query the data store
*/
public abstract RouterHeartbeatResponse routerHeartbeat(
RouterHeartbeatRequest request) throws IOException;
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.impl;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
/**
* Implementation of the {@link RouterStore} state store API.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RouterStoreImpl extends RouterStore {
public RouterStoreImpl(StateStoreDriver driver) {
super(driver);
}
@Override
public GetRouterRegistrationResponse getRouterRegistration(
GetRouterRegistrationRequest request) throws IOException {
final RouterState partial = RouterState.newInstance();
partial.setAddress(request.getRouterId());
final Query<RouterState> query = new Query<RouterState>(partial);
RouterState record = getDriver().get(getRecordClass(), query);
if (record != null) {
overrideExpiredRecord(record);
}
GetRouterRegistrationResponse response =
GetRouterRegistrationResponse.newInstance();
response.setRouter(record);
return response;
}
@Override
public GetRouterRegistrationsResponse getRouterRegistrations(
GetRouterRegistrationsRequest request) throws IOException {
// Get all values from the cache
QueryResult<RouterState> recordsAndTimeStamp =
getCachedRecordsAndTimeStamp();
List<RouterState> records = recordsAndTimeStamp.getRecords();
long timestamp = recordsAndTimeStamp.getTimestamp();
// Generate response
GetRouterRegistrationsResponse response =
GetRouterRegistrationsResponse.newInstance();
response.setRouters(records);
response.setTimestamp(timestamp);
return response;
}
@Override
public RouterHeartbeatResponse routerHeartbeat(RouterHeartbeatRequest request)
throws IOException {
RouterState record = request.getRouter();
boolean status = getDriver().put(record, true, false);
RouterHeartbeatResponse response =
RouterHeartbeatResponse.newInstance(status);
return response;
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API request for retrieving a single router registration present in the state
* store.
*/
public abstract class GetRouterRegistrationRequest {
public static GetRouterRegistrationRequest newInstance() {
return StateStoreSerializer.newRecord(GetRouterRegistrationRequest.class);
}
public static GetRouterRegistrationRequest newInstance(String routerId) {
GetRouterRegistrationRequest request = newInstance();
request.setRouterId(routerId);
return request;
}
@Public
@Unstable
public abstract String getRouterId();
@Public
@Unstable
public abstract void setRouterId(String routerId);
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
/**
* API response for retrieving a single router registration present in the state
* store.
*/
public abstract class GetRouterRegistrationResponse {
public static GetRouterRegistrationResponse newInstance() throws IOException {
return StateStoreSerializer.newRecord(GetRouterRegistrationResponse.class);
}
@Public
@Unstable
public abstract RouterState getRouter() throws IOException;
@Public
@Unstable
public abstract void setRouter(RouterState router) throws IOException;
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API request for retrieving a all non-expired router registrations present in
* the state store.
*/
public abstract class GetRouterRegistrationsRequest {
public static GetRouterRegistrationsRequest newInstance() throws IOException {
return StateStoreSerializer.newRecord(GetRouterRegistrationsRequest.class);
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
/**
* API response for retrieving a all non-expired router registrations present in
* the state store.
*/
public abstract class GetRouterRegistrationsResponse {
public static GetRouterRegistrationsResponse newInstance()
throws IOException {
return StateStoreSerializer.newRecord(GetRouterRegistrationsResponse.class);
}
@Public
@Unstable
public abstract List<RouterState> getRouters() throws IOException;
@Public
@Unstable
public abstract void setRouters(List<RouterState> routers)
throws IOException;
@Public
@Unstable
public abstract long getTimestamp();
@Public
@Unstable
public abstract void setTimestamp(long time);
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
/**
* API request for registering a router with the state store.
*/
public abstract class RouterHeartbeatRequest {
public static RouterHeartbeatRequest newInstance() throws IOException {
return StateStoreSerializer.newRecord(RouterHeartbeatRequest.class);
}
public static RouterHeartbeatRequest newInstance(RouterState router)
throws IOException {
RouterHeartbeatRequest request = newInstance();
request.setRouter(router);
return request;
}
@Public
@Unstable
public abstract RouterState getRouter() throws IOException;
@Public
@Unstable
public abstract void setRouter(RouterState routerState);
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API response for registering a router with the state store.
*/
public abstract class RouterHeartbeatResponse {
public static RouterHeartbeatResponse newInstance() throws IOException {
return StateStoreSerializer.newRecord(RouterHeartbeatResponse.class);
}
public static RouterHeartbeatResponse newInstance(boolean status)
throws IOException {
RouterHeartbeatResponse response = newInstance();
response.setStatus(status);
return response;
}
@Public
@Unstable
public abstract boolean getStatus();
@Public
@Unstable
public abstract void setStatus(boolean result);
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationRequestProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetRouterRegistrationRequest.
*/
public class GetRouterRegistrationRequestPBImpl
extends GetRouterRegistrationRequest implements PBRecord {
private FederationProtocolPBTranslator<GetRouterRegistrationRequestProto,
Builder, GetRouterRegistrationRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<GetRouterRegistrationRequestProto,
Builder, GetRouterRegistrationRequestProtoOrBuilder>(
GetRouterRegistrationRequestProto.class);
public GetRouterRegistrationRequestPBImpl() {
}
@Override
public GetRouterRegistrationRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public String getRouterId() {
return this.translator.getProtoOrBuilder().getRouterId();
}
@Override
public void setRouterId(String routerId) {
this.translator.getBuilder().setRouterId(routerId);
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationResponseProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.RouterStatePBImpl;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetRouterRegistrationResponse.
*/
public class GetRouterRegistrationResponsePBImpl
extends GetRouterRegistrationResponse implements PBRecord {
private FederationProtocolPBTranslator<GetRouterRegistrationResponseProto,
Builder, GetRouterRegistrationResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<
GetRouterRegistrationResponseProto, Builder,
GetRouterRegistrationResponseProtoOrBuilder>(
GetRouterRegistrationResponseProto.class);
public GetRouterRegistrationResponsePBImpl() {
}
@Override
public GetRouterRegistrationResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message protocol) {
this.translator.setProto(protocol);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public RouterState getRouter() {
RouterRecordProto proto = this.translator.getProtoOrBuilder().getRouter();
return new RouterStatePBImpl(proto);
}
@Override
public void setRouter(RouterState router) {
if (router instanceof RouterStatePBImpl) {
RouterStatePBImpl routerPB = (RouterStatePBImpl)router;
RouterRecordProto routerProto = routerPB.getProto();
this.translator.getBuilder().setRouter(routerProto);
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsRequestProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetRouterRegistrationsRequest.
*/
public class GetRouterRegistrationsRequestPBImpl
extends GetRouterRegistrationsRequest implements PBRecord {
private FederationProtocolPBTranslator<GetRouterRegistrationsRequestProto,
Builder, GetRouterRegistrationsRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<
GetRouterRegistrationsRequestProto, Builder,
GetRouterRegistrationsRequestProtoOrBuilder>(
GetRouterRegistrationsRequestProto.class);
public GetRouterRegistrationsRequestPBImpl() {
}
@Override
public GetRouterRegistrationsRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsResponseProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetRouterRegistrationsResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.RouterStatePBImpl;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetRouterRegistrationsResponse.
*/
public class GetRouterRegistrationsResponsePBImpl
extends GetRouterRegistrationsResponse implements PBRecord {
private FederationProtocolPBTranslator<GetRouterRegistrationsResponseProto,
Builder, GetRouterRegistrationsResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<
GetRouterRegistrationsResponseProto, Builder,
GetRouterRegistrationsResponseProtoOrBuilder>(
GetRouterRegistrationsResponseProto.class);
public GetRouterRegistrationsResponsePBImpl() {
}
@Override
public GetRouterRegistrationsResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public List<RouterState> getRouters() throws IOException {
List<RouterState> ret = new ArrayList<RouterState>();
List<RouterRecordProto> memberships =
this.translator.getProtoOrBuilder().getRoutersList();
for (RouterRecordProto memberProto : memberships) {
RouterState membership = new RouterStatePBImpl(memberProto);
ret.add(membership);
}
return ret;
}
@Override
public void setRouters(List<RouterState> records) throws IOException {
this.translator.getBuilder().clearRouters();
for (RouterState router : records) {
if (router instanceof RouterStatePBImpl) {
RouterStatePBImpl routerPB = (RouterStatePBImpl) router;
this.translator.getBuilder().addRouters(routerPB.getProto());
}
}
}
@Override
public long getTimestamp() {
return this.translator.getProtoOrBuilder().getTimestamp();
}
@Override
public void setTimestamp(long time) {
this.translator.getBuilder().setTimestamp(time);
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatRequestProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.RouterStatePBImpl;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* RouterHeartbeatRequest.
*/
public class RouterHeartbeatRequestPBImpl extends RouterHeartbeatRequest
implements PBRecord {
private FederationProtocolPBTranslator<RouterHeartbeatRequestProto, Builder,
RouterHeartbeatRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<RouterHeartbeatRequestProto,
Builder, RouterHeartbeatRequestProtoOrBuilder>(
RouterHeartbeatRequestProto.class);
public RouterHeartbeatRequestPBImpl() {
}
@Override
public RouterHeartbeatRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public RouterState getRouter() throws IOException {
RouterRecordProto routerProto =
this.translator.getProtoOrBuilder().getRouter();
return new RouterStatePBImpl(routerProto);
}
@Override
public void setRouter(RouterState routerState) {
if (routerState instanceof RouterStatePBImpl) {
RouterStatePBImpl routerStatePB = (RouterStatePBImpl)routerState;
this.translator.getBuilder().setRouter(routerStatePB.getProto());
}
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatResponseProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* RouterHeartbeatResponse.
*/
public class RouterHeartbeatResponsePBImpl extends RouterHeartbeatResponse
implements PBRecord {
private FederationProtocolPBTranslator<RouterHeartbeatResponseProto, Builder,
RouterHeartbeatResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<RouterHeartbeatResponseProto,
Builder, RouterHeartbeatResponseProtoOrBuilder>(
RouterHeartbeatResponseProto.class);
public RouterHeartbeatResponsePBImpl() {
}
@Override
public RouterHeartbeatResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public boolean getStatus() {
return this.translator.getProtoOrBuilder().getStatus();
}
@Override
public void setStatus(boolean result) {
this.translator.getBuilder().setStatus(result);
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
import java.io.IOException;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Entry to log the state of a
* {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} in the
* {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService
* FederationStateStoreService}.
*/
public abstract class RouterState extends BaseRecord {
private static final Logger LOG = LoggerFactory.getLogger(RouterState.class);
/** Expiration time in ms for this entry. */
private static long expirationMs;
/**
* Constructors.
*/
public RouterState() {
super();
}
public static RouterState newInstance() {
RouterState record = StateStoreSerializer.newRecord(RouterState.class);
record.init();
return record;
}
public static RouterState newInstance(String addr, long startTime,
RouterServiceState status) {
RouterState record = newInstance();
record.setDateStarted(startTime);
record.setAddress(addr);
record.setStatus(status);
record.setCompileInfo(FederationUtil.getCompileInfo());
record.setBuildVersion(FederationUtil.getBuildVersion());
return record;
}
public abstract void setAddress(String address);
public abstract void setDateStarted(long dateStarted);
public abstract String getAddress();
public abstract StateStoreVersion getStateStoreVersion() throws IOException;
public abstract void setStateStoreVersion(StateStoreVersion version);
public abstract RouterServiceState getStatus();
public abstract void setStatus(RouterServiceState newStatus);
public abstract String getBuildVersion();
public abstract void setBuildVersion(String version);
public abstract String getCompileInfo();
public abstract void setCompileInfo(String info);
public abstract long getDateStarted();
/**
* Get the identifier for the Router. It uses the address.
*
* @return Identifier for the Router.
*/
public String getRouterId() {
return getAddress();
}
@Override
public boolean like(BaseRecord o) {
if (o instanceof RouterState) {
RouterState other = (RouterState)o;
if (getAddress() != null &&
!getAddress().equals(other.getAddress())) {
return false;
}
if (getStatus() != null &&
!getStatus().equals(other.getStatus())) {
return false;
}
return true;
}
return false;
}
@Override
public String toString() {
return getAddress() + " -> " + getStatus() + "," + getBuildVersion();
}
@Override
public SortedMap<String, String> getPrimaryKeys() {
SortedMap<String, String> map = new TreeMap<>();
map.put("address", getAddress());
return map;
}
@Override
public boolean validate() {
boolean ret = super.validate();
if ((getAddress() == null || getAddress().length() == 0) &&
getStatus() != RouterServiceState.INITIALIZING) {
LOG.error("Invalid router entry, no address specified {}", this);
ret = false;
}
return ret;
}
@Override
public int compareTo(BaseRecord other) {
if (other == null) {
return -1;
} else if (other instanceof RouterState) {
RouterState router = (RouterState) other;
return this.getAddress().compareTo(router.getAddress());
} else {
return super.compareTo(other);
}
}
@Override
public boolean checkExpired(long currentTime) {
if (super.checkExpired(currentTime)) {
setStatus(RouterServiceState.EXPIRED);
return true;
}
return false;
}
@Override
public long getExpirationMs() {
return RouterState.expirationMs;
}
public static void setExpirationMs(long time) {
RouterState.expirationMs = time;
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* Entry to track the version of the State Store data stored in the State Store
* by a Router.
*/
public abstract class StateStoreVersion extends BaseRecord {
public static StateStoreVersion newInstance() {
return StateStoreSerializer.newRecord(StateStoreVersion.class);
}
public static StateStoreVersion newInstance(long membershipVersion,
long mountTableVersion) {
StateStoreVersion record = newInstance();
record.setMembershipVersion(membershipVersion);
record.setMountTableVersion(mountTableVersion);
return record;
}
public abstract long getMembershipVersion();
public abstract void setMembershipVersion(long version);
public abstract long getMountTableVersion();
public abstract void setMountTableVersion(long version);
@Override
public SortedMap<String, String> getPrimaryKeys() {
// This record is not stored directly, no key needed
SortedMap<String, String> map = new TreeMap<String, String>();
return map;
}
@Override
public long getExpirationMs() {
// This record is not stored directly, no expiration needed
return -1;
}
@Override
public void setDateModified(long time) {
// We don't store this record directly
}
@Override
public long getDateModified() {
// We don't store this record directly
return 0;
}
@Override
public void setDateCreated(long time) {
// We don't store this record directly
}
@Override
public long getDateCreated() {
// We don't store this record directly
return 0;
}
@Override
public String toString() {
return "Membership: " + getMembershipVersion() +
" Mount Table: " + getMountTableVersion();
}
}

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterRecordProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProto;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the RouterState record.
*/
public class RouterStatePBImpl extends RouterState implements PBRecord {
private FederationProtocolPBTranslator<RouterRecordProto, Builder,
RouterRecordProtoOrBuilder> translator =
new FederationProtocolPBTranslator<RouterRecordProto, Builder,
RouterRecordProtoOrBuilder>(RouterRecordProto.class);
public RouterStatePBImpl() {
}
public RouterStatePBImpl(RouterRecordProto proto) {
this.translator.setProto(proto);
}
@Override
public RouterRecordProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public void setAddress(String address) {
RouterRecordProto.Builder builder = this.translator.getBuilder();
if (address == null) {
builder.clearAddress();
} else {
builder.setAddress(address);
}
}
@Override
public String getAddress() {
RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasAddress()) {
return null;
}
return proto.getAddress();
}
@Override
public void setStateStoreVersion(StateStoreVersion version) {
RouterRecordProto.Builder builder = this.translator.getBuilder();
if (version instanceof StateStoreVersionPBImpl) {
StateStoreVersionPBImpl versionPB = (StateStoreVersionPBImpl)version;
StateStoreVersionRecordProto versionProto =
(StateStoreVersionRecordProto)versionPB.getProto();
builder.setStateStoreVersion(versionProto);
} else {
builder.clearStateStoreVersion();
}
}
@Override
public StateStoreVersion getStateStoreVersion() throws IOException {
RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasStateStoreVersion()) {
return null;
}
StateStoreVersionRecordProto versionProto = proto.getStateStoreVersion();
StateStoreVersion version =
StateStoreSerializer.newRecord(StateStoreVersion.class);
if (version instanceof StateStoreVersionPBImpl) {
StateStoreVersionPBImpl versionPB = (StateStoreVersionPBImpl)version;
versionPB.setProto(versionProto);
return versionPB;
} else {
throw new IOException("Cannot get State Store version");
}
}
@Override
public RouterServiceState getStatus() {
RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasStatus()) {
return null;
}
return RouterServiceState.valueOf(proto.getStatus());
}
@Override
public void setStatus(RouterServiceState newStatus) {
RouterRecordProto.Builder builder = this.translator.getBuilder();
if (newStatus == null) {
builder.clearStatus();
} else {
builder.setStatus(newStatus.toString());
}
}
@Override
public String getBuildVersion() {
RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasBuildVersion()) {
return null;
}
return proto.getBuildVersion();
}
@Override
public void setBuildVersion(String version) {
RouterRecordProto.Builder builder = this.translator.getBuilder();
if (version == null) {
builder.clearBuildVersion();
} else {
builder.setBuildVersion(version);
}
}
@Override
public String getCompileInfo() {
RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasCompileInfo()) {
return null;
}
return proto.getCompileInfo();
}
@Override
public void setCompileInfo(String info) {
RouterRecordProto.Builder builder = this.translator.getBuilder();
if (info == null) {
builder.clearCompileInfo();
} else {
builder.setCompileInfo(info);
}
}
@Override
public void setDateStarted(long dateStarted) {
this.translator.getBuilder().setDateStarted(dateStarted);
}
@Override
public long getDateStarted() {
return this.translator.getProtoOrBuilder().getDateStarted();
}
@Override
public void setDateModified(long time) {
this.translator.getBuilder().setDateModified(time);
}
@Override
public long getDateModified() {
return this.translator.getProtoOrBuilder().getDateModified();
}
@Override
public void setDateCreated(long time) {
this.translator.getBuilder().setDateCreated(time);
}
@Override
public long getDateCreated() {
return this.translator.getProtoOrBuilder().getDateCreated();
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.StateStoreVersionRecordProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the StateStoreVersion record.
*/
public class StateStoreVersionPBImpl extends StateStoreVersion
implements PBRecord {
private FederationProtocolPBTranslator<StateStoreVersionRecordProto, Builder,
StateStoreVersionRecordProtoOrBuilder> translator =
new FederationProtocolPBTranslator<StateStoreVersionRecordProto,
Builder, StateStoreVersionRecordProtoOrBuilder>(
StateStoreVersionRecordProto.class);
public StateStoreVersionPBImpl() {
}
@Override
public StateStoreVersionRecordProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public long getMembershipVersion() {
return this.translator.getProtoOrBuilder().getMembershipVersion();
}
@Override
public void setMembershipVersion(long version) {
this.translator.getBuilder().setMembershipVersion(version);
}
@Override
public long getMountTableVersion() {
return this.translator.getProtoOrBuilder().getMountTableVersion();
}
@Override
public void setMountTableVersion(long version) {
this.translator.getBuilder().setMountTableVersion(version);
}
}

View File

@ -172,3 +172,47 @@ message GetMountTableEntriesResponseProto {
optional uint64 timestamp = 2;
}
/////////////////////////////////////////////////
// Routers
/////////////////////////////////////////////////
message StateStoreVersionRecordProto {
optional uint64 membershipVersion = 1;
optional uint64 mountTableVersion = 2;
}
message RouterRecordProto {
optional uint64 dateCreated = 1;
optional uint64 dateModified = 2;
optional string address = 3;
optional string status = 4;
optional StateStoreVersionRecordProto stateStoreVersion = 5;
optional string buildVersion = 6;
optional string compileInfo = 7;
optional uint64 dateStarted = 8;
}
message GetRouterRegistrationRequestProto {
optional string routerId = 1;
}
message GetRouterRegistrationResponseProto {
optional RouterRecordProto router = 1;
}
message GetRouterRegistrationsRequestProto {
}
message GetRouterRegistrationsResponseProto {
optional uint64 timestamp = 1;
repeated RouterRecordProto routers = 2;
}
message RouterHeartbeatRequestProto {
optional RouterRecordProto router = 1;
}
message RouterHeartbeatResponseProto {
optional bool status = 1;
}

View File

@ -37,6 +37,7 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@ -44,6 +45,8 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -94,6 +97,10 @@ public class TestStateStoreDriverBase {
return randomString;
}
private long generateRandomLong() {
return RANDOM.nextLong();
}
@SuppressWarnings("rawtypes")
private <T extends Enum> T generateRandomEnum(Class<T> enumClass) {
int x = RANDOM.nextInt(enumClass.getEnumConstants().length);
@ -117,6 +124,12 @@ public class TestStateStoreDriverBase {
Map<String, String> destMap = Collections.singletonMap(
generateRandomString(), "/" + generateRandomString());
return (T) MountTable.newInstance(src, destMap);
} else if (recordClass == RouterState.class) {
RouterState routerState = RouterState.newInstance(generateRandomString(),
generateRandomLong(), generateRandomEnum(RouterServiceState.class));
StateStoreVersion version = generateFakeRecord(StateStoreVersion.class);
routerState.setStateStoreVersion(version);
return (T) routerState;
}
return null;

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.junit.Test;
/**
* Test the Router State records.
*/
public class TestRouterState {
private static final String ADDRESS = "address";
private static final String BUILD_VERSION = "buildVersion";
private static final String COMPILE_INFO = "compileInfo";
private static final long START_TIME = 100;
private static final long DATE_MODIFIED = 200;
private static final long DATE_CREATED = 300;
private static final long FILE_RESOLVER_VERSION = 500;
private static final RouterServiceState STATE = RouterServiceState.RUNNING;
private RouterState generateRecord() throws IOException {
RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE);
record.setBuildVersion(BUILD_VERSION);
record.setCompileInfo(COMPILE_INFO);
record.setDateCreated(DATE_CREATED);
record.setDateModified(DATE_MODIFIED);
StateStoreVersion version = StateStoreVersion.newInstance();
version.setMountTableVersion(FILE_RESOLVER_VERSION);
record.setStateStoreVersion(version);
return record;
}
private void validateRecord(RouterState record) throws IOException {
assertEquals(ADDRESS, record.getAddress());
assertEquals(START_TIME, record.getDateStarted());
assertEquals(STATE, record.getStatus());
assertEquals(COMPILE_INFO, record.getCompileInfo());
assertEquals(BUILD_VERSION, record.getBuildVersion());
StateStoreVersion version = record.getStateStoreVersion();
assertEquals(FILE_RESOLVER_VERSION, version.getMountTableVersion());
}
@Test
public void testGetterSetter() throws IOException {
RouterState record = generateRecord();
validateRecord(record);
}
@Test
public void testSerialization() throws IOException {
RouterState record = generateRecord();
StateStoreSerializer serializer = StateStoreSerializer.getSerializer();
String serializedString = serializer.serializeString(record);
RouterState newRecord =
serializer.deserialize(serializedString, RouterState.class);
validateRecord(newRecord);
}
}