python-wechat-kf/app/services/crypto.py

112 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import hashlib
import struct
import xml.etree.ElementTree as ET
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from app.config import settings
def _get_aes_key() -> bytes:
"""将 43 位 EncodingAESKey 解码为 32 字节 AES Key"""
key = settings.callback_aes_key
# 43 位 Base64 → 补齐 = 号 → 解码得 32 字节
return base64.b64decode(key + "=")
def verify_signature(token: str, timestamp: str, nonce: str, encrypt_str: str,
msg_signature: str) -> bool:
"""验证消息签名"""
params = sorted([token, timestamp, nonce, encrypt_str])
raw = "".join(params)
local_sig = hashlib.sha1(raw.encode()).hexdigest()
return local_sig == msg_signature
def decrypt(encrypt_str: str) -> str:
"""AES-256-CBC 解密"""
aes_key = _get_aes_key()
cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16])
raw = base64.b64decode(encrypt_str)
plain = unpad(cipher.decrypt(raw), 16)
# 去掉前 16 字节随机串
# 4 字节 msg_len网络字节序+ msg + corpid
content = plain[16:]
msg_len = struct.unpack("!I", content[:4])[0]
msg = content[4:4 + msg_len].decode("utf-8")
return msg
def encrypt(plain_text: str) -> str:
"""AES-256-CBC 加密"""
aes_key = _get_aes_key()
import os
random_bytes = os.urandom(16)
msg_bytes = plain_text.encode("utf-8")
msg_len = struct.pack("!I", len(msg_bytes))
corpid_bytes = settings.corpid.encode("utf-8")
raw = random_bytes + msg_len + msg_bytes + corpid_bytes
from Crypto.Util.Padding import pad
cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16])
encrypted = cipher.encrypt(pad(raw, 16))
return base64.b64encode(encrypted).decode()
def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
"""GET 请求:验证签名 + 解密 echostr"""
token = settings.callback_token
# 验证签名
params = sorted([token, timestamp, nonce, echostr])
raw = "".join(params)
local_sig = hashlib.sha1(raw.encode()).hexdigest()
if local_sig != msg_signature:
raise ValueError("签名验证失败")
return decrypt(echostr)
def decrypt_message(xml_body: str, msg_signature: str = "",
timestamp: str = "", nonce: str = "") -> tuple[str, str]:
"""POST 请求:验证签名 + 解密 XML 消息,返回 (token, open_kfid)
解密后的 XML 包含 <Token> 和 <OpenKfId> 等字段
"""
root = ET.fromstring(xml_body)
encrypt_elem = root.find("Encrypt")
if encrypt_elem is None or encrypt_elem.text is None:
raise ValueError("XML 中缺少 Encrypt 字段")
# 去除可能的空白/换行,确保 Base64 解码正确
encrypt_str = encrypt_elem.text.strip()
# 验证签名(如果提供了签名参数)
if msg_signature and timestamp and nonce:
if not verify_signature(settings.callback_token, timestamp, nonce,
encrypt_str, msg_signature):
raise ValueError("消息签名验证失败")
# 解密
plain_text = decrypt(encrypt_str)
# 解密后的内容可能是 XML 或 JSON取决于微信客服的配置
# 先尝试 XML 解析
try:
plain_root = ET.fromstring(plain_text)
token = ""
open_kfid = ""
token_elem = plain_root.find("Token")
kfid_elem = plain_root.find("OpenKfId")
if token_elem is not None:
token = token_elem.text or ""
if kfid_elem is not None:
open_kfid = kfid_elem.text or ""
return token, open_kfid
except ET.ParseError:
# 可能是 JSON 格式
import json
data = json.loads(plain_text)
token = data.get("Token", "")
open_kfid = data.get("OpenKfId", "")
return token, open_kfid