产研效率提升-工具篇-消息中心
作者:QA 团队 韩旭
摘要:
组织在开发高质量产品过程中,可以将业务改进的焦点集中在:方法、人、工具三个方面。三者的粘合剂就是过程。过程需要高效的消息传递。
人-工具-方法-过程的关系:

过程(流程):过程是各种方法论的实质化体现,一个企业不同团队、环节使用的不同的方法的衔接就变成了流程。例如我们做软件服务的企业常用的开发过程包括瀑布开发、敏捷开发,都是软件工程和敏捷思维具体化的表现。
人:人(企业员工/外部客户等)是过程里面必不可少的角色,人根据团队、分工、角色不同,处在过程中的各个环节,执行过程中的各种活动,保障整个过程可以正常运转。例如典型的瀑布开发过程从市场分析→需求分析→方案设计→编码实现→测试验证→交付维护几个阶段,其中方案设计可能还包括架构设计→概要设计→详细设计等细分的设计环节(活动),各个环节的上下游会设定一定的准入、准出条件来作为流转依据。产品经理主要处于需求阶段、开发处于设计阶段、测试处于验证阶段、运维处于部署和运维阶段。
工具:工欲善其事、必先利其器,各种不同工具是为了企业更高效、更高质量执行过程的必要的具体手段和载体。如我们软件开发过程中使用的项目管理工具JIRA、TAPD、禅道、GIT、SVN、FTP等等。
在一个企业发展的不同阶段,随着新方法的引入、不同工具的使用和人员的加入,会和之前既定形成的过程、工具、人员碰撞出新的矛盾和问题,导致生产力下降,为了尽量的避免或解决这种状况,就需要快速的解决已经或可能出现的各种问题。
背景
我司(循环智能)业务主要集中在ToB行业并且处于一个快速发展的阶段,业务量、人员都快速增长,系统架构变得更复杂,之前老的流程和办法已经无法完全满足产品高质量、快速、标准化的交付给客户。在最近我司发展的1-2年时间,我司整体产研大概经过了几个阶段:
业务少:系统架构简单、客户交付复杂度低、职能团队管理
业务量开始增长:系统架构从单体架构向微服务架构演进、客户交付复杂度增加
业务量快速增长:产研组织架构由职能团队向强矩阵团队转变、团队规模快速扩张、产品交付速度要更快、产品质量要求更高……
上述一系列的变化需要选择合适的方法、过程和工具来不断匹配当前公司的战略目标、解决一系列的各种问题。这篇文章主要给大家分享一个公司发展过程中产研团队遇到的一个具体问题和解决方案,其他问题和解决问题的方法会在后续按照质量体系、流程体系、问题处理的经验方法几个方面做分享。
遇到的具体问题
我司目前 IM 工具使用飞书,项目管理工具 tapd、运维的账号管理使用的 ldap……不同的信息零散的分布在不同的工具或者系统上很难联动,虽然目前使用的IM工具飞书已经提供了丰富的机器人功能,但系统之间的隔离、不同消息和受众的消息策略需要重复建设、一些关键通知、告警消息无法集中收集和分析,对效率提升、质量改进和快速响应都有一定的限制。
例如我司目前使用的tapd在测试提交Bug之后研发并无法第一时间知道Bug被提交了(虽然tapd有消息中心和邮件提醒,但我们已经习惯了飞书通知),提交的Bug是迭代的Bug还是线上生产问题,这样就需要人为的通知或者最终导致问题解决过慢影响迭代进度或者导致内外部客户的抱怨。
解决方案
为了解决类似的这种问题我们期待有一种以飞书/手机为触达终端、各种不同消息均可接入、消息策略集中配置管理、消息可分类分析的设想,用以推进我司的效率提升、质量提升和过程优化。
消息中心整体设计

tapd、消息中心、飞书打通方案

飞书开发自研机器人,实现如下功能
群消息
私发消息
群@消息
消息回复
tapd企业版配置消息助手
消息回调到消息中心服务
消息中心根据消息类型+消息策略格式化消息
消息中心将消息回调给飞书自研机器人
机器人根据策略通知到群、个人或@某人(目前期望飞书提供加急消息功能,丰富消息策略)
备注:tapd和飞书属于两个账号体系,目前需要做映射绑定,为了简化账号管理,目前在实现ladp+飞书+tapd的账号统一管理服务,将各个系统账号打通。
飞书创建机器人和调用API配置
自定义webhook机器人不能单独给某人发送消息,自建应用机器人不仅可以单独私发消息还可以回复某人消息,所以我们选择了后者自建应用机器人,不过两种配置方式我们都来说一下。
一、自定义 webhook 机器人
1.群组添加机器人入口

2.拷贝webhook地址
3.调用方式
只需要POST方式调用webhook地址即可实现webhook自定义机器人发送消息,可参考: 例子如下:
curl -X POST -H "Content-Type: application/json"
-d '{"msg_type":"text","content":{"text":"request example"}}'
二.自建应用机器人

2.凭证与基础信息

3.权限管理申请对应的权限

4.消息回复功能配置
需要配置消息回复回调地址

5.版本管理与发布

6.企业管理员审批
企业管理员审批上线后可以在机器人中可以查看到自建机器人

三.自建应用干货
备注:消息中心代码涉及公司保密协议不方便提供,不过提供无消息回复代码示例
1.需要机器人消息回复
代码示例
#!/usr/bin/env python
# --coding:utf-8--
from http.server import BaseHTTPRequestHandler, HTTPServer
from os import path
import json
from urllib import request, parse
APP_ID = "cli_XXXX"
APP_SECRET = "XXXX"
APP_VERIFICATION_TOKEN = "XXXX"
class RequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
# 解析请求 body
req_body = self.rfile.read(int(self.headers['content-length']))
obj = json.loads(req_body.decode("utf-8"))
print(req_body)
# 校验 verification token 是否匹配,token 不匹配说明该回调并非来自开发平台
token = obj.get("token", "")
if token != APP_VERIFICATION_TOKEN:
print("verification token not match, token =", token)
self.response("")
return
# 根据 type 处理不同类型事件
type = obj.get("type", "")
if "url_verification" == type: # 验证请求 URL 是否有效
self.handle_request_url_verify(obj)
elif "event_callback" == type: # 事件回调
# 获取事件内容和类型,并进行相应处理,此处只关注给机器人推送的消息事件
event = obj.get("event")
if event.get("type", "") == "message":
self.handle_message(event)
return
return
def handle_request_url_verify(self, post_obj):
# 原样返回 challenge 字段内容
challenge = post_obj.get("challenge", "")
rsp = {'challenge': challenge}
self.response(json.dumps(rsp))
return
def handle_message(self, event):
# 此处只处理 text 类型消息,其他类型消息忽略
msg_type = event.get("msg_type", "")
if msg_type != "text":
print("unknown msg_type =", msg_type)
self.response("")
return
# 调用发消息 API 之前,先要获取 API 调用凭证:tenant_access_token
access_token = self.get_tenant_access_token()
if access_token == "":
self.response("")
return
# 机器人 echo 收到的消息
self.send_message(access_token, event.get("open_id"), event.get("text"))
self.response("")
return
def response(self, body):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(body.encode())
def get_tenant_access_token(self):
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
"Content-Type" : "application/json"
}
req_body = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
print(e.read().decode())
return ""
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
print("get tenant_access_token error, code =", code)
return ""
return rsp_dict.get("tenant_access_token", "")
def send_message(self, token, open_id, text):
url = "https://open.feishu.cn/open-apis/message/v4/send/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + token
}
req_body = {
"open_id": open_id,
"msg_type": "text",
"content": {
"text": text
}
}
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
print(e.read().decode())
return
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
print("send message error, code = ", code, ", msg =", rsp_dict.get("msg", ""))
def run():
port = 8000
server_address = ('', port)
httpd = HTTPServer(server_address, RequestHandler)
print("start.....")
httpd.serve_forever()
if __name__ == '__main__':
run()
2.飞书消息API
3.无消息回复
代码示例
# -*- coding: utf-8 -*-
# @Auth : hanxu
# @Time : 2021-06
import os
import logging
import datetime
import json
import requests
from flask import Flask, request
import pymysql
app = Flask(__name__)
@app.before_request
def before_request():
try:
#可以做参数校验
url = request.full_path
method = request.method
headers = request.headers
body = request.json
print(url)
print(method)
print(headers)
print(body)
except Exception as e:
logging.info(e)
@app.route('/info/notice/create', methods=['POST'])
def info_create():
"""
消息通知
:return:
"""
json_param = request.json
bug_info = json_param['events']
bug_create = json_param['events']['bug::create']
new = bug_create['new']
product_name = new['custom_field_6']
bug_id = bug_create['id']
created_user = bug_create['user']
if exists_key(new, 'current_owner'):
current_owner = new['current_owner']
else:
current_owner = 'hanxu;'
if exists_key(new, 'title'):
title = new['title']
else:
title = '无标题'
if exists_key(new, 'de'):
developer = new['de']
else:
developer = ''
if exists_key(new, 'te'):
tester = new['te']
else:
tester = ''
if exists_key(new, 'severity'):
severity = new['severity']
else:
severity = ''
logging.info(
"产品名称{};缺陷ID{};标题:{};严重等级{};处理人{};创建人{};开发人{};测试人{}".format(product_name, bug_id, title, severity,
current_owner, created_user,
developer, tester))
chart_id='' #如果不需要在群中发送消息就换成用户open_id即可
params={
"receive_id": "%s" % chart_id,
"msg_type": "post",
"content": "{\"zh_cn\":{\"title\":\"bug单创建通知\",\"content\":[[{\"tag\":\"text\",\"text\":\"缺陷ID:\"},{\"tag\":\"text\",\"text\":\"%s\"}],[{\"tag\":\"text\",\"text\":\"缺陷标题:\"},{\"tag\":\"text\",\"text\":\"%s\"}],[{\"tag\":\"text\",\"text\":\"严重等级:\"},{\"tag\":\"text\",\"text\":\"%s\"}],[{\"tag\":\"text\",\"text\":\"处理人:\"}%s],[{\"tag\":\"text\",\"text\":\"创建人:\"},{\"tag\":\"text\",\"text\":\"%s\"}],[{\"tag\":\"text\",\"text\":\"开发者:\"},{\"tag\":\"text\",\"text\":\"%s\"}],[{\"tag\":\"text\",\"text\":\"测试者:\"},{\"tag\":\"text\",\"text\":\"%s\"}],[{\"tag\":\"text\",\"text\":\"bug单链接 :\"},{\"tag\":\"a\",\"href\":\"https://www.tapd.cn/44232473/bugtrace/bugs/view?bug_id=%s\",\"text\":\"链接\"}]]}}" % (
bug_id[-7:], title, severity, receiver_tag, created_user, developer, tester, bug_id)
}
create_res = send_message_to_user_at_chart(get_app_access_token(),params)
logging.info(f"bug单创建结果:{create_res}")
return "200"
def exists_key(bug, key):
"""
判断key值是否存在
:param bug:
:param key:
:return:
"""
return key in bug.keys()
def log_ininit():
"""
初始化日志
:return:
"""
log_path = os.popen('pwd').read().strip() # 日志文件路径
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
filename='%s/app-%s.log' % (log_path, datetime.datetime.now().strftime('%Y-%m-%d')),
filemode='w')
logger = logging.getLogger(__name__)
sh = logging.StreamHandler() # 往屏幕上输出
# sh.setFormatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') # 设置屏幕上显示的格式
logger.setLevel(logging.INFO)
logger.addHandler(sh)
def get_app_access_token():
"""
获取 app_access_token
:return:
"""
url = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal"
param = json.dumps({
"app_id": "cli_xxx",
"app_secret": "xxxxxxxx"
})
header = {
"Content-Type": 'application/json; charset=utf-8'
}
r = requests.post(url, data=param, headers=header)
return "Bearer {}".format(r.json()['app_access_token'])
def send_message_to_user_at_chart(access_token, params):
"""
在群里给某个人发送消息
:param access_token: token
:param params: 参数
:return:
"""
url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id" #这里可根据tupen换成open_id
header = {
"Content-Type": 'application/json',
"Authorization": '%s' % access_token
}
r = requests.post(url, data=json.dumps(params), headers=header)
return r.text
if __name__ == "__main__":
log_ininit()
app.run(host="0.0.0.0", port=8000, debug=True)
tapd自动化助手:
1.创建自动化规则
选择对象需求、bug、任务、流水线等事件发生可以调用webhook
这里不得不吐槽下tapd的推送数据格式,好多实际推送数据比tapd示例中的字段还是增加不少,好多字段含义靠猜出来的
{
"auto_task_id": "1020355972000001849",
"auto_task_action_id": "1020355972000001749",
"workspace_id": 21000072,
"events": {
"story::status_change": {
"workspace_id": 21000072,
"user": "lincoln",
"object_type": "story",
"id": "1021000072854809635",
"timestamp": 1595388790,
"new": {
"completed": "2020-07-22 11:33:10",
"id": "1021000072854809635",
"status": "resolved"
},
"old": {
"completed": null,
"id": "1021000072854809635",
"status": "developing"
},
"event_key": "story::status_change",
"from": "web"
}
},
"actions": [
"timer_trigger_search":{
"story":[
1020358496854838221,
1020358496854835273,
1020358496854832911
]
},
],
"condition": null
}
2.字段说明
