init code

This commit is contained in:
zwt13703 2026-04-27 16:51:26 +08:00
parent c38e871e38
commit 6714dc9fe8
23 changed files with 938 additions and 0 deletions

View File

@ -0,0 +1,10 @@
{
"permissions": {
"allow": [
"Bash(git:*)",
"WebSearch",
"Bash(python3 -m pip install -r requirements.txt)",
"Bash(python3:*)"
]
}
}

6
.env.example Normal file
View File

@ -0,0 +1,6 @@
CORPID=ww9f866d5bf5175e77
SECRET=O8sMN29urjyetJB1kU8jy6NglF8eMmdz6UpSLpVL7No
OPEN_KFID=kfc0ec0f3746ebd8b95
CALLBACK_TOKEN=your_callback_token
CALLBACK_AES_KEY=your_43_char_aes_key
DATABASE_URL=postgresql+asyncpg://wechat_kf:wechat_kf@159.195.38.93:5432/wechat_kf

109
README.md
View File

@ -1,2 +1,111 @@
# python-wechat-kf # python-wechat-kf
微信客服 API 网页测试工具:监听用户消息、主动/被动发消息,消息存储到 PostgreSQL。
## 快速开始
### 1. 创建 PostgreSQL 数据库
```bash
createdb wechat_kf
```
### 2. 配置 .env
```bash
cp .env.example .env
```
编辑 `.env` 填入真实配置:
```env
# --- 企业微信基础信息(必填)---
# 企业 ID在企业微信管理后台 - 我的企业 页面底部查看
CORPID=ww9f866d5bf5175e77
# 应用 Secret在企业微信管理后台 - 微信客服 - 开发配置 中获取
SECRET=your_secret_here
# 客服账号 ID在企业微信管理后台 - 微信客服 - 客服账号 详情页查看
OPEN_KFID=wkxxxxxxxxxxxxx
# --- 回调配置(可选,仅接收实时消息时需要)---
# 自定义 3-32 位字符串,与后台配置的回调 Token 一致
CALLBACK_TOKEN=my_token_123
# 企业微信后台随机生成的 43 位 EncodingAESKey
CALLBACK_AES_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# --- 数据库配置 ---
# 格式: postgresql+asyncpg://用户名:密码@主机:端口/数据库名
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/wechat_kf
```
**数据库连接字符串** 格式:
```
postgresql+asyncpg://用户名:密码@主机地址:端口/数据库名
```
如果需要修改用户名、密码、主机或数据库名,直接改这个连接字符串即可。
### 3. 安装依赖并启动
```bash
pip install -r requirements.txt
python run.py
```
访问 http://localhost:8000 即可看到聊天界面。
---
## CALLBACK_TOKEN 和 CALLBACK_AES_KEY 的使用
这两个参数用于 **微信回调消息的加解密**,只在需要接收实时消息推送时才需要配置。
### 配置流程
1. 登录 [企业微信管理后台](https://work.weixin.qq.com)
2. 进入 **微信客服** → 选中客服账号 → **开发配置**
3. 在回调配置中:
- **Token**:自定义填一个 3-32 位字符串(如 `my_token_123`),同时写入 `.env``CALLBACK_TOKEN`
- **EncodingAESKey**:点击"随机生成",得到一个 **43 位字符串**,填入 `.env``CALLBACK_AES_KEY`
- **回调 URL**:填写 `https://你的域名/webhook`(见下方本地测试方案)
4. 保存后微信会立刻发送 GET 请求到回调 URL 进行验证
### 如果不配回调
不配回调不影响以下功能:
- 点击页面的 **"同步拉取"** 按钮主动轮询获取消息
- 在页面中 **发送消息** 给客户
---
## 本地测试方案
回调模式要求 URL 必须是 **公网 HTTPS**。本地开发可以用以下工具:
### ngrok推荐
```bash
# 安装 ngrok 并启动
ngrok http 8000
# 会得到一个公网 URL例如 https://abc123.ngrok.io
# 在微信管理后台填入 https://abc123.ngrok.io/webhook 即可
```
### 其他方案
| 方案 | 说明 |
|------|------|
| **ngrok** | 免费,一行命令,推荐 |
| **frp** | 需要一台有公网 IP 的服务器 |
| **localhost.run** | `ssh -R 80:localhost:8000 localhost.run` |
| **部署到公网服务器** | 直接部署到有 HTTPS 的服务器 |
### 不配回调也能测试
即使没有公网地址,也可以:
1. 启动应用 → 打开页面 → 点击 **"同步拉取"** 获取历史消息
2. 在输入框输入内容 → 点击 **"发送"** 主动给客户发消息
3. 这是**轮询模式**,不需要公网、不需要回调配置

0
app/__init__.py Normal file
View File

15
app/config.py Normal file
View File

@ -0,0 +1,15 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
corpid: str = ""
secret: str = ""
open_kfid: str = ""
callback_token: str = ""
callback_aes_key: str = ""
database_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/wechat_kf"
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
settings = Settings()

16
app/database.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.config import settings
engine = create_async_engine(settings.database_url, echo=False)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db() -> AsyncSession:
async with async_session() as session:
yield session
async def init_db():
from app.models.message import Base
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

24
app/main.py Normal file
View File

@ -0,0 +1,24 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from app.database import init_db
from app.routes import webhook, api, pages
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
yield
app = FastAPI(title="微信客服 API 测试工具", lifespan=lifespan)
# 静态文件
static_dir = Path(__file__).parent / "static"
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
# 路由
app.include_router(pages.router)
app.include_router(webhook.router)
app.include_router(api.router)

0
app/models/__init__.py Normal file
View File

34
app/models/message.py Normal file
View File

@ -0,0 +1,34 @@
from datetime import datetime
from sqlalchemy import Integer, String, Text, DateTime, Index
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import func
class Base(DeclarativeBase):
pass
class Message(Base):
__tablename__ = "messages"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
msg_id: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
open_kfid: Mapped[str] = mapped_column(String(128), nullable=False)
external_userid: Mapped[str] = mapped_column(String(128), nullable=False)
servicer_userid: Mapped[str | None] = mapped_column(String(128))
send_time: Mapped[datetime] = mapped_column(DateTime, nullable=False)
msgtype: Mapped[str] = mapped_column(String(32), nullable=False)
origin: Mapped[str] = mapped_column(String(16), nullable=False)
content: Mapped[str | None] = mapped_column(Text)
raw_data: Mapped[dict | None] = mapped_column(JSONB)
direction: Mapped[str] = mapped_column(String(16), default="inbound")
status: Mapped[str] = mapped_column(String(32), default="received")
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
__table_args__ = (
Index("idx_messages_external_userid", "external_userid"),
Index("idx_messages_send_time", "send_time"),
Index("idx_messages_open_kfid", "open_kfid"),
)

0
app/routes/__init__.py Normal file
View File

79
app/routes/api.py Normal file
View File

@ -0,0 +1,79 @@
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from app.database import get_db
from app.services.message_service import (
get_conversations,
get_messages,
send_and_save,
save_messages,
)
from app.services.wechat_client import sync_msg
from app.config import settings
router = APIRouter(prefix="/api")
class SendMsgRequest(BaseModel):
external_userid: str
content: str
open_kfid: str = ""
class SyncRequest(BaseModel):
cursor: str = ""
token: str = ""
limit: int = 100
open_kfid: str = ""
@router.get("/conversations")
async def list_conversations(
open_kfid: str = Query(""),
db: AsyncSession = Depends(get_db),
):
"""获取会话列表"""
conversations = await get_conversations(db, open_kfid)
return {"conversations": conversations}
@router.get("/messages")
async def list_messages(
external_userid: str = Query(...),
open_kfid: str = Query(""),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
db: AsyncSession = Depends(get_db),
):
"""获取某客户的消息列表"""
messages = await get_messages(db, external_userid, open_kfid, page, page_size)
return {"messages": messages}
@router.post("/send")
async def send_message(
req: SendMsgRequest,
db: AsyncSession = Depends(get_db),
):
"""发送消息"""
result = await send_and_save(db, req.external_userid, req.content, req.open_kfid)
return result
@router.post("/sync")
async def sync_messages(
req: SyncRequest,
db: AsyncSession = Depends(get_db),
):
"""手动拉取消息(轮询模式)"""
kfid = req.open_kfid or settings.open_kfid
result = await sync_msg(kfid, req.cursor, req.token, req.limit)
msg_list = result.get("msg_list", [])
saved = 0
if msg_list:
saved = await save_messages(db, msg_list)
return {
**result,
"saved": saved,
"total": len(msg_list),
}

13
app/routes/pages.py Normal file
View File

@ -0,0 +1,13 @@
from fastapi import APIRouter, Request
from fastapi.templating import Jinja2Templates
from pathlib import Path
templates_dir = Path(__file__).parent.parent / "templates"
templates = Jinja2Templates(directory=str(templates_dir))
router = APIRouter()
@router.get("/")
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})

54
app/routes/webhook.py Normal file
View File

@ -0,0 +1,54 @@
from fastapi import APIRouter, Request, Query, Depends
from fastapi.responses import PlainTextResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.services.crypto import verify_url, decrypt_message
from app.services.wechat_client import sync_msg
from app.services.message_service import save_messages
from app.config import settings
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/webhook")
async def verify_callback(
msg_signature: str = Query(..., alias="msg_signature"),
timestamp: str = Query(...),
nonce: str = Query(...),
echostr: str = Query(...),
):
"""URL 验证:解密 echostr 并返回明文"""
try:
plain = verify_url(msg_signature, timestamp, nonce, echostr)
return PlainTextResponse(plain)
except Exception as e:
logger.error(f"URL 验证失败: {e}")
return PlainTextResponse("verification failed", status_code=400)
@router.post("/webhook")
async def receive_callback(request: Request, db: AsyncSession = Depends(get_db)):
"""接收消息回调:解密 XML → 拉取消息 → 入库"""
try:
xml_body = await request.body()
xml_str = xml_body.decode("utf-8")
logger.info(f"收到回调: {xml_str[:200]}")
token, open_kfid = decrypt_message(xml_str)
if not token or not open_kfid:
logger.warning("解密后 Token 或 OpenKfId 为空")
return PlainTextResponse("fail")
# 拉取消息
result = await sync_msg(open_kfid, token=token, limit=100)
msg_list = result.get("msg_list", [])
if msg_list:
saved = await save_messages(db, msg_list)
logger.info(f"回调拉取消息 {len(msg_list)} 条,新增入库 {saved}")
return PlainTextResponse("success")
except Exception as e:
logger.error(f"回调处理失败: {e}")
return PlainTextResponse("fail")

0
app/services/__init__.py Normal file
View File

94
app/services/crypto.py Normal file
View File

@ -0,0 +1,94 @@
import base64
import hashlib
import struct
import xml.etree.ElementTree as ET
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from app.config import settings
def _get_aes_key() -> bytes:
"""将 43 位 EncodingAESKey 解码为 32 字节 AES Key"""
key = settings.callback_aes_key
# 43 位 Base64 → 补齐 = 号 → 解码得 32 字节
return base64.b64decode(key + "=")
def verify_signature(token: str, timestamp: str, nonce: str, encrypt_str: str,
msg_signature: str) -> bool:
"""验证消息签名"""
params = sorted([token, timestamp, nonce, encrypt_str])
raw = "".join(params)
local_sig = hashlib.sha1(raw.encode()).hexdigest()
return local_sig == msg_signature
def decrypt(encrypt_str: str) -> str:
"""AES-256-CBC 解密"""
aes_key = _get_aes_key()
cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16])
raw = base64.b64decode(encrypt_str)
plain = unpad(cipher.decrypt(raw), 16)
# 去掉前 16 字节随机串
# 4 字节 msg_len网络字节序+ msg + corpid
content = plain[16:]
msg_len = struct.unpack("!I", content[:4])[0]
msg = content[4:4 + msg_len].decode("utf-8")
return msg
def encrypt(plain_text: str) -> str:
"""AES-256-CBC 加密"""
aes_key = _get_aes_key()
import os
random_bytes = os.urandom(16)
msg_bytes = plain_text.encode("utf-8")
msg_len = struct.pack("!I", len(msg_bytes))
corpid_bytes = settings.corpid.encode("utf-8")
raw = random_bytes + msg_len + msg_bytes + corpid_bytes
from Crypto.Util.Padding import pad
cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16])
encrypted = cipher.encrypt(pad(raw, 16))
return base64.b64encode(encrypted).decode()
def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
"""GET 请求:验证签名 + 解密 echostr"""
token = settings.callback_token
# 验证签名
params = sorted([token, timestamp, nonce, echostr])
raw = "".join(params)
local_sig = hashlib.sha1(raw.encode()).hexdigest()
if local_sig != msg_signature:
raise ValueError("签名验证失败")
return decrypt(echostr)
def decrypt_message(xml_body: str) -> tuple[str, str]:
"""POST 请求:解密 XML 消息,返回 (token, open_kfid)
注意解密后的 XML 包含 <Token> <OpenKfId> 等字段
"""
root = ET.fromstring(xml_body)
encrypt_elem = root.find("Encrypt")
if encrypt_elem is None or encrypt_elem.text is None:
raise ValueError("XML 中缺少 Encrypt 字段")
encrypt_str = encrypt_elem.text
# 解密
plain_text = decrypt(encrypt_str)
# 解析解密后的 XML
plain_root = ET.fromstring(plain_text)
token = ""
open_kfid = ""
token_elem = plain_root.find("Token")
kfid_elem = plain_root.find("OpenKfId")
if token_elem is not None:
token = token_elem.text or ""
if kfid_elem is not None:
open_kfid = kfid_elem.text or ""
return token, open_kfid

View File

@ -0,0 +1,141 @@
from datetime import datetime
from sqlalchemy import select, func, desc, and_
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.message import Message
from app.services.wechat_client import send_msg
from app.config import settings
async def save_messages(db: AsyncSession, msg_list: list[dict]) -> int:
"""批量保存消息,按 msg_id 去重,返回新增数量"""
saved = 0
for msg in msg_list:
stmt = pg_insert(Message).values(
msg_id=msg.get("msgid") or msg.get("msg_id", ""),
open_kfid=msg.get("open_kfid", settings.open_kfid),
external_userid=msg.get("external_userid", ""),
servicer_userid=msg.get("servicer_userid"),
send_time=datetime.fromtimestamp(msg.get("send_time", 0)),
msgtype=msg.get("msgtype", "unknown"),
origin=msg.get("origin", "customer"),
content=_extract_text_content(msg),
raw_data=msg,
direction="inbound" if msg.get("origin") != "servicer" else "outbound",
status="received",
).on_conflict_do_nothing(index_elements=["msg_id"])
result = await db.execute(stmt)
if result.rowcount:
saved += 1
await db.commit()
return saved
async def get_conversations(db: AsyncSession, open_kfid: str = "", limit: int = 50) -> list[dict]:
"""获取会话列表:按 external_userid 分组,显示最新消息"""
kfid = open_kfid or settings.open_kfid
# 子查询:每个客户的最新消息
subq = (
select(
Message.external_userid,
func.max(Message.send_time).label("latest_time")
)
.where(Message.open_kfid == kfid)
.group_by(Message.external_userid)
.order_by(desc("latest_time"))
.limit(limit)
.subquery()
)
q = (
select(Message)
.join(subq, and_(
Message.external_userid == subq.c.external_userid,
Message.send_time == subq.c.latest_time
))
.where(Message.open_kfid == kfid)
.order_by(desc(Message.send_time))
)
result = await db.execute(q)
rows = result.scalars().all()
return [
{
"external_userid": r.external_userid,
"latest_time": r.send_time.isoformat(),
"latest_content": r.content or "",
"latest_msgtype": r.msgtype,
}
for r in rows
]
async def get_messages(db: AsyncSession, external_userid: str,
open_kfid: str = "", page: int = 1,
page_size: int = 50) -> list[dict]:
"""获取某客户的消息列表(时间升序)"""
kfid = open_kfid or settings.open_kfid
offset = (page - 1) * page_size
q = (
select(Message)
.where(
Message.open_kfid == kfid,
Message.external_userid == external_userid,
)
.order_by(Message.send_time.asc())
.offset(offset)
.limit(page_size)
)
result = await db.execute(q)
rows = result.scalars().all()
return [
{
"msg_id": r.msg_id,
"content": r.content,
"msgtype": r.msgtype,
"send_time": r.send_time.isoformat(),
"origin": r.origin,
"direction": r.direction,
}
for r in rows
]
async def send_and_save(db: AsyncSession, external_userid: str, content: str,
open_kfid: str = "") -> dict:
"""发送消息并记录到数据库,返回发送结果"""
kfid = open_kfid or settings.open_kfid
result = await send_msg(
touser=external_userid,
open_kfid=kfid,
msgtype="text",
content=content,
)
errcode = result.get("errcode", -1)
status = "sent" if errcode == 0 else "failed"
# 记录到数据库
msg = Message(
msg_id=result.get("msgid", f"out_{int(datetime.now().timestamp())}"),
open_kfid=kfid,
external_userid=external_userid,
servicer_userid="",
send_time=datetime.now(),
msgtype="text",
origin="servicer",
content=content,
raw_data=result,
direction="outbound",
status=status,
)
db.add(msg)
await db.commit()
return result
def _extract_text_content(msg: dict) -> str:
"""从消息中提取文本内容"""
if msg.get("msgtype") == "text":
text_data = msg.get("text", {})
return text_data.get("content", "") if isinstance(text_data, dict) else str(text_data)
return ""

View File

@ -0,0 +1,34 @@
import time
import asyncio
import httpx
from app.config import settings
# token 缓存
_token_cache: dict = {}
_lock = asyncio.Lock()
async def get_access_token() -> str:
"""获取 access_token带缓存和并发刷新保护"""
now = int(time.time())
if _token_cache.get("access_token") and _token_cache.get("expires_at", 0) > now + 300:
return _token_cache["access_token"]
async with _lock:
# 双重检查
if _token_cache.get("access_token") and _token_cache.get("expires_at", 0) > now + 300:
return _token_cache["access_token"]
async with httpx.AsyncClient() as client:
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
resp = await client.get(url, params={
"corpid": settings.corpid,
"corpsecret": settings.secret,
})
data = resp.json()
if data.get("errcode") != 0:
raise Exception(f"获取 access_token 失败: {data}")
_token_cache["access_token"] = data["access_token"]
_token_cache["expires_at"] = now + data["expires_in"]
return _token_cache["access_token"]

View File

@ -0,0 +1,40 @@
import httpx
from app.services.token_manager import get_access_token
WECHAT_API_BASE = "https://qyapi.weixin.qq.com"
async def send_msg(touser: str, open_kfid: str, msgtype: str, content: str) -> dict:
"""主动发送消息给客户"""
token = await get_access_token()
url = f"{WECHAT_API_BASE}/cgi-bin/kf/send_msg?access_token={token}"
body = {
"touser": touser,
"open_kfid": open_kfid,
"msgtype": msgtype,
}
if msgtype == "text":
body["text"] = {"content": content}
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=body)
return resp.json()
async def sync_msg(open_kfid: str, cursor: str = "", token: str = "", limit: int = 100) -> dict:
"""拉取客服消息(轮询模式)"""
access_token = await get_access_token()
url = f"{WECHAT_API_BASE}/cgi-bin/kf/sync_msg?access_token={access_token}"
body = {
"cursor": cursor,
"token": token,
"limit": limit,
"open_kfid": open_kfid,
}
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=body)
return resp.json()

162
app/static/app.js Normal file
View File

@ -0,0 +1,162 @@
// 状态
let currentUser = "";
let conversations = [];
// 初始化
document.addEventListener("DOMContentLoaded", () => {
loadConversations();
});
// Toast
function showToast(msg, type = "success") {
const el = document.getElementById("toast");
el.textContent = msg;
el.className = "toast toast-" + type;
el.style.display = "block";
el.style.opacity = "1";
setTimeout(() => { el.style.opacity = "0"; setTimeout(() => el.style.display = "none", 300); }, 2000);
}
// 加载会话列表
async function loadConversations() {
try {
const resp = await fetch("/api/conversations");
const data = await resp.json();
conversations = data.conversations || [];
renderConversations();
if (conversations.length === 0) {
showToast("暂无会话,请先点击"同步拉取"获取消息", "error");
}
} catch (e) {
showToast("加载会话失败: " + e.message, "error");
}
}
// 渲染会话列表
function renderConversations() {
const list = document.getElementById("conversationList");
if (conversations.length === 0) {
list.innerHTML = '<div style="padding:20px;color:#999;text-align:center;">暂无会话</div>';
return;
}
list.innerHTML = conversations.map(c => `
<div class="conv-item ${c.external_userid === currentUser ? 'active' : ''}"
onclick="selectConversation('${c.external_userid}')">
<div class="userid">${c.external_userid}</div>
<div class="preview">${escapeHtml(c.latest_content || '[非文本消息]')}</div>
<div class="time">${formatTime(c.latest_time)}</div>
</div>
`).join("");
}
// 选择会话
async function selectConversation(userid) {
currentUser = userid;
document.getElementById("msgHeader").textContent = "客户: " + userid;
document.getElementById("sendBtn").disabled = false;
document.getElementById("msgInput").disabled = false;
renderConversations();
await loadMessages(userid);
}
// 加载消息
async function loadMessages(userid) {
try {
const resp = await fetch(`/api/messages?external_userid=${userid}`);
const data = await resp.json();
const msgs = data.messages || [];
renderMessages(msgs);
} catch (e) {
showToast("加载消息失败: " + e.message, "error");
}
}
// 渲染消息
function renderMessages(msgs) {
const container = document.getElementById("messageList");
if (msgs.length === 0) {
container.innerHTML = '<div class="empty-state">暂无消息</div>';
return;
}
container.innerHTML = msgs.map(m => `
<div class="msg-bubble ${m.direction === 'outbound' ? 'msg-outbound' : 'msg-inbound'}">
<div>${escapeHtml(m.content || `[${m.msgtype}]`)}</div>
<div class="msg-time">${formatTime(m.send_time)}</div>
</div>
`).join("");
container.scrollTop = container.scrollHeight;
}
// 发送消息
async function sendMessage() {
if (!currentUser) return;
const input = document.getElementById("msgInput");
const content = input.value.trim();
if (!content) return;
const btn = document.getElementById("sendBtn");
btn.disabled = true;
btn.textContent = "发送中...";
try {
const resp = await fetch("/api/send", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ external_userid: currentUser, content }),
});
const result = await resp.json();
if (result.errcode === 0) {
input.value = "";
showToast("发送成功");
await loadMessages(currentUser);
} else {
showToast("发送失败: " + (result.errmsg || JSON.stringify(result)), "error");
}
} catch (e) {
showToast("发送失败: " + e.message, "error");
} finally {
btn.disabled = false;
btn.textContent = "发送";
}
}
// 同步拉取
async function syncMessages() {
const statusEl = document.getElementById("syncStatus");
statusEl.textContent = "同步中...";
try {
const resp = await fetch("/api/sync", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({}),
});
const data = await resp.json();
if (data.errcode === 0) {
showToast(`同步成功,获取 ${data.total} 条消息,新增入库 ${data.saved}`);
await loadConversations();
if (currentUser) await loadMessages(currentUser);
} else {
showToast("同步失败: " + (data.errmsg || JSON.stringify(data)), "error");
}
} catch (e) {
showToast("同步失败: " + e.message, "error");
} finally {
statusEl.textContent = "";
}
}
// 格式化时间
function formatTime(iso) {
if (!iso) return "";
const d = new Date(iso);
const pad = (n) => String(n).padStart(2, "0");
return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ${pad(d.getHours())}:${pad(d.getMinutes())}`;
}
// HTML 转义
function escapeHtml(text) {
const div = document.createElement("div");
div.textContent = text;
return div.innerHTML;
}

76
app/templates/index.html Normal file
View File

@ -0,0 +1,76 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>微信客服 - 消息测试</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif; height: 100vh; display: flex; }
/* 侧栏 */
.sidebar {
width: 320px; border-right: 1px solid #e0e0e0; display: flex; flex-direction: column; background: #f5f5f5;
}
.sidebar-header { padding: 16px; border-bottom: 1px solid #e0e0e0; background: #fff; }
.sidebar-header h2 { font-size: 18px; margin-bottom: 8px; }
.sidebar-header button { padding: 6px 16px; background: #07c160; color: #fff; border: none; border-radius: 4px; cursor: pointer; font-size: 14px; }
.sidebar-header button:hover { background: #06ad56; }
.conversation-list { flex: 1; overflow-y: auto; }
.conv-item {
padding: 14px 16px; cursor: pointer; border-bottom: 1px solid #e8e8e8; transition: background .15s;
}
.conv-item:hover { background: #ececec; }
.conv-item.active { background: #d9d9d9; }
.conv-item .userid { font-size: 14px; font-weight: 500; color: #333; }
.conv-item .preview { font-size: 13px; color: #888; margin-top: 4px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }
.conv-item .time { font-size: 12px; color: #aaa; margin-top: 2px; }
/* 消息区域 */
.main { flex: 1; display: flex; flex-direction: column; }
.msg-header {
padding: 16px; border-bottom: 1px solid #e0e0e0; font-size: 16px; font-weight: 500; background: #fff;
}
.msg-list { flex: 1; overflow-y: auto; padding: 16px; background: #ededed; display: flex; flex-direction: column; gap: 12px; }
.msg-bubble {
max-width: 70%; padding: 10px 14px; border-radius: 8px; font-size: 15px; line-height: 1.5; word-break: break-word;
}
.msg-inbound { align-self: flex-start; background: #fff; color: #333; }
.msg-outbound { align-self: flex-end; background: #95ec69; color: #000; }
.msg-time { font-size: 11px; color: #aaa; margin-top: 4px; }
.empty-state { flex: 1; display: flex; align-items: center; justify-content: center; color: #999; font-size: 15px; }
/* 输入区 */
.input-area { padding: 16px; border-top: 1px solid #e0e0e0; background: #fff; display: flex; gap: 10px; }
.input-area input { flex: 1; padding: 10px 14px; border: 1px solid #ddd; border-radius: 6px; font-size: 15px; outline: none; }
.input-area input:focus { border-color: #07c160; }
.input-area button { padding: 10px 24px; background: #07c160; color: #fff; border: none; border-radius: 6px; cursor: pointer; font-size: 15px; }
.input-area button:hover { background: #06ad56; }
.input-area button:disabled { background: #aaa; cursor: not-allowed; }
.toast { position: fixed; top: 20px; left: 50%; transform: translateX(-50%); padding: 10px 20px; border-radius: 6px; color: #fff; font-size: 14px; z-index: 999; transition: opacity .3s; }
.toast-success { background: #07c160; }
.toast-error { background: #f44336; }
</style>
</head>
<body>
<!-- 侧栏 -->
<div class="sidebar">
<div class="sidebar-header">
<h2>会话列表</h2>
<button onclick="syncMessages()">同步拉取</button>
<span id="syncStatus" style="font-size:12px;color:#888;margin-left:8px;"></span>
</div>
<div class="conversation-list" id="conversationList"></div>
</div>
<!-- 消息区 -->
<div class="main">
<div class="msg-header" id="msgHeader">请选择会话</div>
<div class="msg-list" id="messageList">
<div class="empty-state">选择左侧会话查看消息</div>
</div>
<div class="input-area">
<input type="text" id="msgInput" placeholder="输入消息..." onkeydown="if(event.key==='Enter') sendMessage()">
<button id="sendBtn" onclick="sendMessage()" disabled>发送</button>
</div>
</div>
<div class="toast" id="toast" style="display:none;"></div>
<script src="/static/app.js"></script>
</body>
</html>

View File

@ -0,0 +1,17 @@
# 任务执行摘要
## 会话 ID: 1
- [2026-04-27 16:10:00]
- **执行原因**: 实现微信客服 API 网页测试应用,支持监听用户消息、主动/被动发送消息、消息存储到 PostgreSQL
- **执行过程**:
1. 创建项目目录结构app/models, app/services, app/routes, app/static, app/templates
2. 实现配置层config.pypydantic-settings 读取 .env和 .env.example 模板。
3. 实现数据库层database.pyAsyncEngine + session和 models/message.pyMessage ORM
4. 实现微信 API 客户端token_manager.pyaccess_token 内存缓存 + asyncio.Lock和 wechat_client.pysend_msg / sync_msg
5. 实现回调加解密crypto.py签名验证、AES-256-CBC 解密、URL 验证、消息解密)。
6. 实现路由层webhook.pyGET 验证 / POST 接收回调、api.py会话列表/消息查询/发送/同步、pages.pyJinja2 模板渲染)。
7. 实现消息业务层message_service.py消息保存去重、会话列表、消息分页、发送并记录
8. 实现前端界面index.html左侧会话列表 + 右侧消息气泡 + 底部输入框)和 app.js原生 JS 交互)。
9. 实现主入口main.pyFastAPI 挂载路由 + 静态文件 + lifespan 建表)和 run.pyuvicorn 启动)。
10. 安装依赖并验证完整导入链和路由注册。
- **执行结果**: 项目全部 19 个源文件创建完成,导入链和路由验证通过。用户需创建 .env 文件配置微信参数后即可 `python run.py` 启动。

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
fastapi>=0.110.0
uvicorn[standard]>=0.29.0
sqlalchemy[asyncio]>=2.0.30
asyncpg>=0.29.0
httpx>=0.27.0
pycryptodome>=3.20.0
pydantic-settings>=2.2.0
jinja2>=3.1.3
python-multipart>=0.0.9

5
run.py Normal file
View File

@ -0,0 +1,5 @@
#!/usr/bin/env python
import uvicorn
if __name__ == "__main__":
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)