import base64 import hashlib import struct import xml.etree.ElementTree as ET from Crypto.Cipher import AES from Crypto.Util.Padding import unpad from app.config import settings def _get_aes_key() -> bytes: """将 43 位 EncodingAESKey 解码为 32 字节 AES Key""" key = settings.callback_aes_key # 43 位 Base64 → 补齐 = 号 → 解码得 32 字节 return base64.b64decode(key + "=") def verify_signature(token: str, timestamp: str, nonce: str, encrypt_str: str, msg_signature: str) -> bool: """验证消息签名""" params = sorted([token, timestamp, nonce, encrypt_str]) raw = "".join(params) local_sig = hashlib.sha1(raw.encode()).hexdigest() return local_sig == msg_signature def decrypt(encrypt_str: str) -> str: """AES-256-CBC 解密""" aes_key = _get_aes_key() cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16]) raw = base64.b64decode(encrypt_str) plain = unpad(cipher.decrypt(raw), 16) # 去掉前 16 字节随机串 # 4 字节 msg_len(网络字节序)+ msg + corpid content = plain[16:] msg_len = struct.unpack("!I", content[:4])[0] msg = content[4:4 + msg_len].decode("utf-8") return msg def encrypt(plain_text: str) -> str: """AES-256-CBC 加密""" aes_key = _get_aes_key() import os random_bytes = os.urandom(16) msg_bytes = plain_text.encode("utf-8") msg_len = struct.pack("!I", len(msg_bytes)) corpid_bytes = settings.corpid.encode("utf-8") raw = random_bytes + msg_len + msg_bytes + corpid_bytes from Crypto.Util.Padding import pad cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16]) encrypted = cipher.encrypt(pad(raw, 16)) return base64.b64encode(encrypted).decode() def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str: """GET 请求:验证签名 + 解密 echostr""" token = settings.callback_token # 验证签名 params = sorted([token, timestamp, nonce, echostr]) raw = "".join(params) local_sig = hashlib.sha1(raw.encode()).hexdigest() if local_sig != msg_signature: raise ValueError("签名验证失败") return decrypt(echostr) def decrypt_message(xml_body: str) -> tuple[str, str]: """POST 请求:解密 XML 消息,返回 (token, open_kfid) 注意:解密后的 XML 包含 等字段 """ root = ET.fromstring(xml_body) encrypt_elem = root.find("Encrypt") if encrypt_elem is None or encrypt_elem.text is None: raise ValueError("XML 中缺少 Encrypt 字段") encrypt_str = encrypt_elem.text # 解密 plain_text = decrypt(encrypt_str) # 解析解密后的 XML plain_root = ET.fromstring(plain_text) token = "" open_kfid = "" token_elem = plain_root.find("Token") kfid_elem = plain_root.find("OpenKfId") if token_elem is not None: token = token_elem.text or "" if kfid_elem is not None: open_kfid = kfid_elem.text or "" return token, open_kfid