import os, re, time, datetime, base64, requests, json, hashlib
from requests_toolbelt import MultipartEncoder
from urllib import parse
# 群发文本消息
def SendGroupMsg(textContent, webHookUrl, mentioned_list=[],mentioned_mobile_list=[]):
"""
7 发送微信群组机器人消息
8 :param textContent: 消息内容
9 :param webHookUrl: 群组机器人WebHook
10 :param mentioned_list: userid的列表,提醒群中的指定成员(@某个成员),@all表示提醒所有人
11 :param mentioned_mobile_list: 手机号列表,提醒手机号对应的群成员(@某个成员),@all表示提醒所有人
12 :return:
13 """
# url为群组机器人WebHook,配置项
url = webHookUrl
headers = {
"content-type": "application/json"
}
msg = {"msgtype": "text",
"text": {
"content": textContent,
"mentioned_list":mentioned_list,
"mentioned_mobile_list": mentioned_mobile_list
}} # 发送文本消息27 # 发送请求
try:
result = requests.post(url, headers=headers, json=msg)
return True
except Exception as e:
# print("Requset Failed:", e)
return False
# 群发图片
def sendImg(imgPath,webHookUrl):
"""
发送微信群组机器人图片
图片支持jpg,png两种格式,大小不能超过2Mb
:param imgPath: 图片的本地路径
:param webHookUrl: 群组机器人WebHook
:return:
"""
# url为群组机器人WebHook,配置项
url = webHookUrl
with open(imgPath,"rb") as f:
fd=f.read()
base64Content=str(base64.b64encode(fd),"utf-8")
with open(imgPath,"rb") as f:
fd=f.read()
md = hashlib.md5()
md.update(fd)
md5Content = md.hexdigest()
headers = {"content-type": "application/json"}
msg = {"msgtype": "image","image": {"base64": base64Content,"md5": md5Content}}
# 发送请求
try:
result = requests.post(url, headers=headers, json=msg)
return True
except Exception as e:
# print("Requset Failed:", e)
return False
# 发送群文件
def QYWXSendGroupFile(filepath, url):
"""
发送微信群组机器人文件
:param filepath: 文件路径
:param url: 群组机器人WebHook
:return:
"""
# url为群组机器人WebHook,配置项
url = url
headers = {
"content-type": "application/json"
}
# 发送文件需要先上传文件获取media_id
media_id = UploadFile(filepath, url)
msg = {"msgtype": "file",
"file": {"media_id": media_id}}
# 发送请求
try:
#result =requests.post(url, headers=headers, json=msg)
requests.post(url, headers=headers, json=msg)
return True
except Exception as e:
print("企业微信机器人发送文件失败,详细信息:" + str(e))
return False
# 上传要发送的文件
def UploadFile(filepath, webHookUrl):
"""
企业微信机器人上传文件,发送文件前需要先上传--要求文件大小在5B~20M之间
:param filepath: 文件路径
:param webHookUrl: 群组机器人WebHook
:return: media_id
"""
# url为群组机器人WebHook,配置项
webHookUrl = webHookUrl
params = parse.parse_qs(parse.urlparse(webHookUrl).query)
webHookKey = params['key'][0]
upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={webHookKey}&type=file'
headers = {"Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) appleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36"}
filename = os.path.basename(filepath)
try:
multipart = MultipartEncoder(
fields={'filename': filename, 'filelength': '', 'name': 'media',
'media': (filename, open(filepath, 'rb'), 'application/octet-stream')},
boundary='-------------------------acebdf13572468')
headers['Content-Type'] = multipart.content_type
resp = requests.post(upload_url, headers=headers, data=multipart)
json_res = resp.json()
if json_res.get('media_id'):
# print(f"企业微信机器人上传文件成功,file:{filepath}")
return json_res.get('media_id')
except Exception as e:
# print(f"企业微信机器人上传文件失败,file: {filepath}, 详情:{e}")
print("企业微信机器人上传文件失败,详细信息:" + str(e))
return ""
if __name__=='__main__':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxxxxxx'
textContent = 'test01'
imagePath = '2022050601.png'
#SendGroupMsg(textContent,url)
#sendImg(imagePath,url)
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站不拥有所有权,不承担相关法律责任。如发现有侵权/违规的内容, 请联系我们:dudu818907@gmail.com,本站将立刻清除。