kubernetes中pythonapi的二次封装-创新互联

k8s python api二次封装

pip install pprint  kubernetes
import urllib3
from pprint import pprint
from kubernetes import client
from os import path
import yaml

class K8sApi(object):
   def __init__(self):
     # self.config = config.kube_config.load_kube_config()
     urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
     self.configuration = client.Configuration()
     self.configuration.host = "https://192.168.100.111:6443"
     self.configuration.api_key[
       'authorization'] = 'Bearer  token'
     self.configuration.verify_ssl = False
     self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration))
     self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration))
     self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration))

   ####################################################################################################################

   def list_deployment(self, namespace="default"):
     api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)
     return api_response

   def read_deployment(self, name="nginx-deployment", namespace="default"):
     api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace)
     return api_response

   def create_deployment(self, file="deploy-nginx.yaml"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       resp = self.k8s_apps_v1.create_namespaced_deployment(
         body=dep, namespace="default")
       return resp

   def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace,
                                  body=dep)
       return resp

   def delete_deployment(self, name="nginx-deployment", namespace="default"):
     api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace)
     return api_response

   ####################################################################################################################

   def list_namespace(self):
     api_response = self.Api_Instance.list_namespace()
     return api_response

   def read_namespace(self, name="default"):
     api_response = self.Api_Instance.read_namespace(name)
     return api_response

   def create_namespace(self, file="pod-nginx.yaml"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       api_response = self.Api_Instance.create_namespace(body=dep)
       return api_response

   def replace_namespace(self, file="pod-nginx.yaml", name="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
     api_response = self.Api_Instance.replace_namespace(name, body=dep)
     return api_response

   def delete_namespace(self, name="default", namespace="default"):
     api_response = self.Api_Instance.delete_namespace(name)
     return api_response

   ####################################################################################################################

   def list_node(self):
     api_response = self.Api_Instance.list_node()
     data = {}
     for i in api_response.items:
       data[i.metadata.name] = {"name": i.metadata.name,
                   "status": i.status.conditions[-1].type if i.status.conditions[
                                          -1].status == "True" else "NotReady",
                   "ip": i.status.addresses[0].address,
                   "kubelet_version": i.status.node_info.kubelet_version,
                   "os_image": i.status.node_info.os_image,
                   }
     return data

   def list_pod(self):
     api_response = self.Api_Instance.list_pod_for_all_namespaces()
     data = {}
     for i in api_response.items:
       data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace}
     return data

   def read_pod(self, name="nginx-pod", namespace="default"):
     api_response = self.Api_Instance.read_namespaced_pod(name, namespace)
     return api_response

   def create_pod(self, file="pod-nginx.yaml", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep)
       return api_response

   def replace_pod(self, file="pod-nginx.yaml", name="nginx-pod", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep)
     return api_response

   def delete_pod(self, name="nginx-pod", namespace="default"):
     api_response = self.Api_Instance.delete_namespaced_pod(name, namespace)
     return api_response

   ####################################################################################################################

   def list_service(self):
     api_response = self.Api_Instance.list_service_for_all_namespaces()
     return api_response

   def read_service(self, name="", namespace="default"):
     api_response = self.Api_Instance.read_namespaced_service(name, namespace)
     return api_response

   def create_service(self, file="service-nginx.yaml", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep)
       return api_response

   def replace_service(self, file="pod-nginx.yaml", name="hequan", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep)
     return api_response

   def delete_service(self, name="hequan", namespace="default"):
     api_response = self.Api_Instance.delete_namespaced_service(name, namespace)
     return api_response

   ####################################################################################################################

   def list_ingress(self):
     api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()
     return api_response

   def read_ingress(self, name="", namespace="default"):
     api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace)
     return api_response

   def create_ingress(self, file="ingress-nginx.yaml", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep)
       return api_response

   def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace,
                                          body=dep)
       return api_response

   def delete_ingress(self, name="hequan", namespace="default"):
     api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace)
     return api_response

   #####################################################################################################################

   def list_stateful(self):
     api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()
     return api_response

   def read_stateful(self, name="nginx-deployment", namespace="default"):
     api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace)
     return api_response

   def create_stateful(self, file="deploy-nginx.yaml"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       resp = self.k8s_apps_v1.create_namespaced_stateful_set(
         body=dep, namespace="default")
       return resp

   def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
     with open(path.join(path.dirname(__file__), file)) as f:
       dep = yaml.safe_load(f)
       resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace,
                                   body=dep)
       return resp

   def delete_stateful(self, name="nginx-deployment", namespace="default"):
     api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace)
     return api_response

   ####################################################################################################################

if __name__ == '__main__':
   def test():
     obj = K8sApi()
     ret = obj.list_deployment()
     pprint(ret)

   test()

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。

创新互联专注于郴州企业网站建设,成都响应式网站建设公司,商城开发。郴州网站建设公司,为郴州等地区提供建站服务。全流程按需网站开发,专业设计,全程项目跟踪,创新互联专业和态度为您提供的服务
当前标题:kubernetes中pythonapi的二次封装-创新互联
分享URL:http://hbruida.cn/article/jijji.html