好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

ansible开发(1)

Big Picture

ansible 的用途和使用场景就不再赘述了,日常使用中难免会遇到一些定制化需求,比如对接自动化操作平台需要将ansible api化,对接一些其他平台/网络设备/存储设备等需要自己定制模块进行自动化运维等,所以ansible的二次开发比较常见。

开发类别

ansible 开发主要分为4大类:

api的调用&&重封装 modules的开发 plugin的开发 源码的修改

api的调用&&重封装

ansible 一般使用都是基于命令行的方式进行执行的,但对于有些公司需要将ansible作为自动化运维平台的底层技术的时候,就需要通过api的方式进行调用, 主要是需要重新编写一下callback(ansible返回)以及动态inventory。

举个例子:

 #!/usr/bin/env python3
#-*- coding:utf-8 -*-
# author:barney
# ansible version: ansible==3.2.0, ansible-base==2.10.7
import re
import requests
import json
import datetime
from oslo_config import cfg
from oslo_log import log
import datetime
from vmc.controller import ControllerBase
# ansible 相关
# ansible 不支持eventlet的monkeypatch 对 select的封装,所以需要关闭
import shutil  
from ansible.module_utils.common.collections import ImmutableDict  
from ansible.parsing.dataloader import DataLoader  
from ansible.vars.manager import VariableManager  
from ansible.inventory.manager import InventoryManager  
from ansible.playbook.play import Play  
from ansible.executor.task_queue_manager import TaskQueueManager  
from ansible.plugins.callback import CallbackBase  
from ansible import context  
import ansible.constants as C
import yaml

LOG = log.getLogger(__name__)
CONF = cfg.CONF

  
class ResultCallback(CallbackBase):
    """  
    基于playbook, 需要对callbackBase类的部分方法  
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}
        self.task_ok = {}

    def v2_runner_on_unreachable(self, result):
        self.host_unreachable[result._host.get_name() + "-" + datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')] = result

    def v2_runner_on_ok(self, result, **kwargs):
        self.host_ok[result._host.get_name() + "-" + datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')] = result

    def v2_runner_on_failed(self, result, **kwargs):
        self.host_failed[result._host.get_name() + "-" + datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')] = result

class MyAnsiable():  
    def __init__(self,  
        connection='smart',
        remote_user='ops',
        remote_password=None,
        private_key_file=None,
        sudo=None,
        sudo_user='root',
        ask_sudo_pass=False,  
        module_path=None,
        become=True,
        become_method='sudo',
        become_user='root',
        check=False, 
        diff=False,  
        listhosts=None,
        listtasks=None,
        listtags=None,  
        verbosity=5,  
        syntax=None,  
        start_at_task=None,
        forks=10,
        timeout=5,
        tags=[],
        skip_tags=[],
        inventory=None):

        """  
        初始化函数,定义的默认的选项值,  
        在初始化的时候可以传参,以便覆盖默认选项的值  
        """  
        context.CLIARGS = ImmutableDict(  
            connection=connection,  
            remote_user=CONF.ansible.remote_user,  
            private_key_file=private_key_file,  
            sudo=sudo,  
            sudo_user=sudo_user,  
            ask_sudo_pass=ask_sudo_pass,  
            module_path=module_path,  
            become=become,  
            become_method=become_method,  
            become_user=become_user,  
            verbosity=verbosity,  
            listhosts=listhosts,  
            listtasks=listtasks,  
            listtags=listtags,  
            syntax=syntax,  
            start_at_task=start_at_task, 
            forks=CONF.ansible.forknum,
            tags=tags,
            skip_tags=skip_tags,
            timeout=CONF.ansible.timeout
        )

        self.inventory = inventory if inventory else "localhost,"

        # 实例化数据解析器  
        self.loader = DataLoader()

        # 实例化 资产配置对象  
        self.inv_obj = InventoryManager(loader=self.loader, sources=self.inventory)

        # 设置密码  
        self.passwords = remote_password

        # 实例化回调插件对象  
        self.results_callback = ResultCallback()
        
        # 变量管理器  
        self.variable_manager = VariableManager(self.loader, self.inv_obj)

    def run(self, gether_facts="no", module="ping", args='', task_time=30):  
        """  
        参数说明:  
        task_time -- 执行异步任务时等待的秒数,这个需要大于 0 ,等于 0 的时候不支持异步(默认值)。这个值应该等于执行任务实际耗时时间为好
        服务器数量少的情况下,使用同步即可 
        """  
        play_source =  dict(  
            name = "Ansible Used for Auto Mgr",  
            hosts = self.inventory,  
            gather_facts = gether_facts,  
            tasks = [ 
               # 异步调用模板
               # {"action":{"module": module, "args": args}, "async": task_time, "poll": 0}])
               # 默认还是走同步调用
               {"action":{"module": module, "args": args}}])

        play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

        tqm = None
        fin_result = []
        try:  
            tqm = TaskQueueManager(  
                      inventory=self.inv_obj ,  
                      variable_manager=self.variable_manager,  
                      loader=self.loader,  
                      passwords=self.passwords,  
                      stdout_callback=self.results_callback)

            result = tqm.run(play)  
            
            for host,result in self.results_callback.host_ok.items():
                tmp_result = {"host": host.split("-")[0], "status": "success","stdout": result._result["stdout"]}
                fin_result.append(tmp_result)
            for host,result in self.results_callback.host_failed.items():
                tmp_result = {"host": host.split("-")[0], "status": "failed","stdout": result._result["stderr"]}
                fin_result.append(tmp_result)
            for host,result in self.results_callback.host_unreachable.items():
                tmp_result = {"host": host.split("-")[0], "status": "unreachable","stdout": "connect timeout"}
                fin_result.append(tmp_result)
        finally:  
            if tqm is not None:  
                tqm.cleanup()  
            shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
        
        return(fin_result)
            
    def update_yml_host(self, ymlfile, hosts):
        with open(ymlfile,'r',encoding='utf-8') as f:
            result = f.read()
            yml = yaml.load(result,Loader=yaml.FullLoader)
            for section in yml:
                section["hosts"] = hosts
            with open(ymlfile,'w',encoding='utf-8') as w_f:
                yaml.dump(yml,w_f)

    def playbook(self, playbooks):  
        """
        参数说明:
        playbooks, 需要是一个列表类型  
        """
        from ansible.executor.playbook_executor import PlaybookExecutor
        
        playbookfile = playbooks[0]
         
        # 更新yml文件
        self.update_yml_host(playbookfile, self.inventory)

        playbook = PlaybookExecutor(playbooks=playbooks,
                        inventory=self.inv_obj,
                        variable_manager=self.variable_manager,
                        loader=self.loader,
                        passwords=self.passwords)
        # 使用回调函数
        playbook._tqm._stdout_callback = self.results_callback

        result = playbook.run()
        
        fin_result = []
        
        """
        todo:
        增加tag标识,结果的每一步的tag 需要增加
        """
        # 判断成功的item,如果成功则并入对应host的步骤
        # 成功情况处理
       
        for host,result in self.results_callback.host_ok.items():
            hn = host.split("-")[0]
            exectime = host.split("-")[1]

            if result._result.__contains__('ansible_facts'):
                continue

            if result._result.__contains__("stdout") and len(result._result["stdout"]) == 0:
                stdout = "ok"
            elif result._result.__contains__("stdout") and len(result._result["stdout"]) != 0:
                stdout = result._result["stdout"]
            else:
                stdout = "ok"

            tmp_result = {"exectime": exectime, "status": "success","stdout": stdout}

            if self.judge_range(fin_result, hn):
                for host_result in fin_result:
                    if host_result['host'] == hn:
                        host_result['step'].append(tmp_result)
                        break
            else:
                fin_result.append({"host": hn, "status":"success", "step": [tmp_result]})
           
        # 失败情况处理
        for host,result in self.results_callback.host_failed.items():
            hn = host.split("-")[0]
            exectime = host.split("-")[1]

            if result._result.__contains__('ansible_facts'):
                continue

            if result._result.__contains__("stderr"):
                stdout = result._result["stderr"]
            else:
                stdout = "unknown error"

            tmp_result = {"exectime": exectime, "status": "failed","stdout": stdout}

            if self.judge_range(fin_result, hn):
                for host_result in fin_result:
                    if host_result['host'] == hn:
                        host_result["status"] = "failed"
                        host_result['step'].append(tmp_result)
                        break
            else:
                fin_result.append({"host": hn, "status":"failed", "step": [tmp_result]})

        # 超时情况处理
        for host,result in self.results_callback.host_unreachable.items():
            hn = host.split("-")[0]
            exectime = host.split("-")[1]

            if result._result.__contains__('ansible_facts'):
                continue

            stdout = "connect timeout"

            tmp_result = {"exectime": exectime, "status": "unreachable","stdout": stdout}

            if self.judge_range(fin_result, hn):
                for host_result in fin_result:
                    if host_result['host'] == hn:
                        host_result['step'].append(tmp_result)
                        break
            else:
                fin_result.append({"host": hn, "status":"unreachable", "step": [tmp_result]})
        return fin_result

    def judge_range(self, fin, hn):
        for x in fin:
            if x["host"] == hn:
                return True
        return False
 
个人公众号, 分享一些日常开发,运维工作中的日常以及一些学习感悟,欢迎大家互相学习,交流

查看更多关于ansible开发(1)的详细内容...

  阅读:39次

上一篇: ansible开发(2)

下一篇:Gunicorn基本使用