钉钉机器人 自动化发版

钉钉机器人 自动化发版

#1 简介

  • 开发机器人接收消息并调用构建接口, 实现自动化发版
  • 发送指令 -> 机器人接收指令 -> 调用jenkins-job远程构建与部署
  • jenkins配置,勾选job配置的触发远程构建并设置身份验证令牌
#测试 触发远程构建 curl -ks -u user:user_token -X POST    jenkins_url/job/job_name/buildWithParameters?token=job_token 

#2、创建机器人

#2.1 登录钉钉开放平台
#2.2 创建机器人
  • 应用开发 -> 机器人 -> 创建应用
    • 继续使用旧版,名称如cici
    • 应用信息,复制AppSecret
  • 开发管理,修改,消息接收地址

钉钉机器人 自动化发版

  • 创建test企业群, 添加机器人cici
  • 复制群机器人token到default_token
#2.3 运行机器人服务

配置环境变量文件.env_lark

#vim .env_dingtalk  # 钉钉机器人密钥 AppSecret ding_secret=Q-uG5AMlMgC_Tkn6qhz1601xMYfQgxzeQh3xxx #默认 机器人token ding_webhook_default_token=bf5ab6a77cbc1b7c21fcxxx #jenkins JenkinsBaseUrl=https://user:user_token@jenkins.elvin.vip/job/ 

使用docker启动机器人服务

docker rm -f robot-dingtalk &>/dev/null docker run -dit --name robot-dingtalk   --restart=always -h robot-dingtalk --net=host  -v $Dir:/opt --env-file .env_dingtalk  registry.aliyuncs.com/elvin/python:dingtalk-robot  python3 /opt/dingtalk-robot.py 

dingtalk-robot.py 实例在https://gitee.com/alivv/elvin-demo

nignx配置域名和lark反向代理

#dingtalk-cicd location ~ ^/dingtalk-cicd {     proxy_pass http://127.0.0.1:8091;     proxy_set_header Host $host:$server_port;     proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } 

#3 发送消息测试

钉钉机器人 自动化发版


#3 源码

python实例如下:

#!/usr/bin/python3 # -*- coding: utf-8 -*- # By Elvin , blog.elvin.vip  import os import time import hmac import hashlib import base64 import json from datetime import datetime from flask import Flask, request import requests  # 从环境变量加载配置 ding_secret = os.getenv("ding_secret") ding_webhook_default_token = os.getenv("ding_webhook_default_token")  app = Flask(__name__)  #钉钉发送文本消息 def send_txt_msg(message, webhook_url):     data = { "msgtype": "text","text": {"content": message}}     #requests发送post请求     req = requests.post(webhook_url, json=data)     print(req)  #签名核对 def check_sign(timestamp=int(time.time() * 1000), app_secret=ding_secret):     #钉钉消息头部加密     app_secret_enc = app_secret.encode('utf-8')     string_to_sign = '{}n{}'.format(timestamp, app_secret)     string_to_sign_enc = string_to_sign.encode('utf-8')     hmac_code = hmac.new(app_secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()     sign = base64.b64encode(hmac_code).decode('utf-8')     return sign   # Default route, print user's IP @app.route('/') def remoteIP():     if 'X-Forwarded-For' in request.headers:         ip = request.headers['X-Forwarded-For'].split(',')[0]     else:         ip = request.remote_addr     return ip + "n", 200, [("Server", "Go"), ("City", "Shanghai")]   # 接收@机器人的消息 @app.route('/dingtalk-cicd', methods=["POST"]) def index():     if request.method == "POST":         timestamp = request.headers.get('Timestamp')         sign = request.headers.get('Sign')         if check_sign(timestamp=timestamp) == sign:             req_data = json.loads(str(request.data, 'utf-8'))             sender = req_data.get('senderNick')             text = req_data.get('text').get('content', "").strip()             ddgroup = req_data.get('conversationTitle').strip()             #msg             msg_cicd(ddgroup,text,sender)             return "succeed"         else:             return "not found"     else:         return "method not found"   #筛选消息,执行指令 def msg_cicd(ddgroup,text,sender):     msg = text     sender = sender     ddGroup = ddgroup     print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), " msg->: ",  msg)     #check ddgroup     if ddGroup == "test" or ddGroup == "DevOps":         webhook = ding_webhook_test         appInfoMap = dict(appTest, **appProd)         myMenu = {"help", "test", "prod"}         if msg in appInfoMap:             app_env = appInfoMap[msg][0]             app_name = appInfoMap[msg][1]             app_url = appInfoMap[msg][2]             #app_url = appInfoMap[msg][2] + appInfoMap[msg][1]             app_url = app_url + app_env + "&app_list=" + app_name             if app_env != "":                 #执行通知                 msg = "By:  %snenv:  %snapp:  %s" % (sender, app_env, app_name)                 send_txt_msg(msg,webhook)                 head = { 'User-Agent': "webhook-dingtalk-robot" }                 #向webhook发起post请求                 res = requests.post(url=app_url, headers=head)                 print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "run", app_env, app_name, res.reason)                 return "succeed"             else:                 print(msg, "nothing")                 return "succeed"         elif msg in myMenu:             #打印命令列表             print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "print menu ", msg)             msgTitle = "#命令  名称n"             if msg == "help":                 msgTitle2 = "#命令  获取列表n"                 msg = msgTitle2 + "test  app-test-listnprod  app-prod-list"             elif msg == "test":                 msg = msgTitle                 for i in appTest:                     msg = msg + i + "  " + appInfoMap[i][1] + "n"             elif msg == "prod":                 msg = msgTitle                 for i in appProd:                     msg = msg + i + "  " + appInfoMap[i][1] + "n"             msg = msg.rstrip('n')             send_txt_msg(msg,webhook)             return "succeed"         else:             msg = f"已收到: {msg} n发送 help@cici 查看支持指令"             send_txt_msg(msg,webhook)             return "succeed"      else:         print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')," no ddGroup config for ",ddGroup)         webhook = ding_webhook_default         msg = f"已收到: {msg} n未发现组{ddGroup}支持指令"         send_txt_msg(msg,webhook)         return "succeed"   #webhook ding_webhook_base_url = "https://oapi.dingtalk.com/robot/send?access_token=" ding_webhook_default = ding_webhook_base_url + ding_webhook_default_token  ding_webhook_test = ding_webhook_default  #webhook url for jenkins  JenkinsBaseUrl = os.getenv("JenkinsBaseUrl")  #job appDeploy = "test-app-deploy/buildWithParameters?token=cicdTest&app_branch=master&app_build=true&docker_build=true&create_git_tag=false&notice_msg=true&app_deploy=true&image_update=true&input_pass=true&deploy_tag=tag&deploy_env="  #ci url appDeployUrl = JenkinsBaseUrl + appDeploy  #hybrid list appTest = { "#app-test-k8s-list:": ["","", ""], "s201": ["test","app-web", appDeployUrl], "s202": ["test","app-svc", appDeployUrl], "s203": ["test","app-api", appDeployUrl], "s204": ["test","app-event", appDeployUrl], "s205": ["test","app-admin", appDeployUrl], }  appProd = { "#app-prod-k8s-list:": ["","", ""], "s101": ["prod","app-web", appDeployUrl], "s102": ["prod","app-svc", appDeployUrl], "s103": ["prod","app-api", appDeployUrl], "s104": ["prod","app-event", appDeployUrl], "s105": ["prod","app-admin", appDeployUrl], }  if __name__ == '__main__':     app.run(host='0.0.0.0', port=8091)  

source: https://gitee.com/alivv/elvin-demo

发表评论

评论已关闭。

相关文章