python-wechat-kf/app/services/crypto.py

95 lines
3.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import hashlib
import struct
import xml.etree.ElementTree as ET
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from app.config import settings
def _get_aes_key() -> bytes:
"""将 43 位 EncodingAESKey 解码为 32 字节 AES Key"""
key = settings.callback_aes_key
# 43 位 Base64 → 补齐 = 号 → 解码得 32 字节
return base64.b64decode(key + "=")
def verify_signature(token: str, timestamp: str, nonce: str, encrypt_str: str,
msg_signature: str) -> bool:
"""验证消息签名"""
params = sorted([token, timestamp, nonce, encrypt_str])
raw = "".join(params)
local_sig = hashlib.sha1(raw.encode()).hexdigest()
return local_sig == msg_signature
def decrypt(encrypt_str: str) -> str:
"""AES-256-CBC 解密"""
aes_key = _get_aes_key()
cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16])
raw = base64.b64decode(encrypt_str)
plain = unpad(cipher.decrypt(raw), 16)
# 去掉前 16 字节随机串
# 4 字节 msg_len网络字节序+ msg + corpid
content = plain[16:]
msg_len = struct.unpack("!I", content[:4])[0]
msg = content[4:4 + msg_len].decode("utf-8")
return msg
def encrypt(plain_text: str) -> str:
"""AES-256-CBC 加密"""
aes_key = _get_aes_key()
import os
random_bytes = os.urandom(16)
msg_bytes = plain_text.encode("utf-8")
msg_len = struct.pack("!I", len(msg_bytes))
corpid_bytes = settings.corpid.encode("utf-8")
raw = random_bytes + msg_len + msg_bytes + corpid_bytes
from Crypto.Util.Padding import pad
cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16])
encrypted = cipher.encrypt(pad(raw, 16))
return base64.b64encode(encrypted).decode()
def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
"""GET 请求:验证签名 + 解密 echostr"""
token = settings.callback_token
# 验证签名
params = sorted([token, timestamp, nonce, echostr])
raw = "".join(params)
local_sig = hashlib.sha1(raw.encode()).hexdigest()
if local_sig != msg_signature:
raise ValueError("签名验证失败")
return decrypt(echostr)
def decrypt_message(xml_body: str) -> tuple[str, str]:
"""POST 请求:解密 XML 消息,返回 (token, open_kfid)
注意:解密后的 XML 包含 <Token> 和 <OpenKfId> 等字段
"""
root = ET.fromstring(xml_body)
encrypt_elem = root.find("Encrypt")
if encrypt_elem is None or encrypt_elem.text is None:
raise ValueError("XML 中缺少 Encrypt 字段")
encrypt_str = encrypt_elem.text
# 解密
plain_text = decrypt(encrypt_str)
# 解析解密后的 XML
plain_root = ET.fromstring(plain_text)
token = ""
open_kfid = ""
token_elem = plain_root.find("Token")
kfid_elem = plain_root.find("OpenKfId")
if token_elem is not None:
token = token_elem.text or ""
if kfid_elem is not None:
open_kfid = kfid_elem.text or ""
return token, open_kfid