diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..1a14819 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,10 @@ +{ + "permissions": { + "allow": [ + "Bash(git:*)", + "WebSearch", + "Bash(python3 -m pip install -r requirements.txt)", + "Bash(python3:*)" + ] + } +} diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..845b3f8 --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +CORPID=ww9f866d5bf5175e77 +SECRET=O8sMN29urjyetJB1kU8jy6NglF8eMmdz6UpSLpVL7No +OPEN_KFID=kfc0ec0f3746ebd8b95 +CALLBACK_TOKEN=your_callback_token +CALLBACK_AES_KEY=your_43_char_aes_key +DATABASE_URL=postgresql+asyncpg://wechat_kf:wechat_kf@159.195.38.93:5432/wechat_kf diff --git a/README.md b/README.md index 5c302c3..417c554 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,111 @@ # python-wechat-kf +微信客服 API 网页测试工具:监听用户消息、主动/被动发消息,消息存储到 PostgreSQL。 + +## 快速开始 + +### 1. 创建 PostgreSQL 数据库 + +```bash +createdb wechat_kf +``` + +### 2. 配置 .env + +```bash +cp .env.example .env +``` + +编辑 `.env` 填入真实配置: + +```env +# --- 企业微信基础信息(必填)--- +# 企业 ID,在企业微信管理后台 - 我的企业 页面底部查看 +CORPID=ww9f866d5bf5175e77 + +# 应用 Secret,在企业微信管理后台 - 微信客服 - 开发配置 中获取 +SECRET=your_secret_here + +# 客服账号 ID,在企业微信管理后台 - 微信客服 - 客服账号 详情页查看 +OPEN_KFID=wkxxxxxxxxxxxxx + +# --- 回调配置(可选,仅接收实时消息时需要)--- +# 自定义 3-32 位字符串,与后台配置的回调 Token 一致 +CALLBACK_TOKEN=my_token_123 + +# 企业微信后台随机生成的 43 位 EncodingAESKey +CALLBACK_AES_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +# --- 数据库配置 --- +# 格式: postgresql+asyncpg://用户名:密码@主机:端口/数据库名 +DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/wechat_kf +``` + +**数据库连接字符串** 格式: +``` +postgresql+asyncpg://用户名:密码@主机地址:端口/数据库名 +``` +如果需要修改用户名、密码、主机或数据库名,直接改这个连接字符串即可。 + +### 3. 安装依赖并启动 + +```bash +pip install -r requirements.txt +python run.py +``` + +访问 http://localhost:8000 即可看到聊天界面。 + +--- + +## CALLBACK_TOKEN 和 CALLBACK_AES_KEY 的使用 + +这两个参数用于 **微信回调消息的加解密**,只在需要接收实时消息推送时才需要配置。 + +### 配置流程 + +1. 登录 [企业微信管理后台](https://work.weixin.qq.com) +2. 进入 **微信客服** → 选中客服账号 → **开发配置** +3. 在回调配置中: + - **Token**:自定义填一个 3-32 位字符串(如 `my_token_123`),同时写入 `.env` 的 `CALLBACK_TOKEN` + - **EncodingAESKey**:点击"随机生成",得到一个 **43 位字符串**,填入 `.env` 的 `CALLBACK_AES_KEY` + - **回调 URL**:填写 `https://你的域名/webhook`(见下方本地测试方案) +4. 保存后微信会立刻发送 GET 请求到回调 URL 进行验证 + +### 如果不配回调 + +不配回调不影响以下功能: +- 点击页面的 **"同步拉取"** 按钮主动轮询获取消息 +- 在页面中 **发送消息** 给客户 + +--- + +## 本地测试方案 + +回调模式要求 URL 必须是 **公网 HTTPS**。本地开发可以用以下工具: + +### ngrok(推荐) + +```bash +# 安装 ngrok 并启动 +ngrok http 8000 + +# 会得到一个公网 URL,例如 https://abc123.ngrok.io +# 在微信管理后台填入 https://abc123.ngrok.io/webhook 即可 +``` + +### 其他方案 + +| 方案 | 说明 | +|------|------| +| **ngrok** | 免费,一行命令,推荐 | +| **frp** | 需要一台有公网 IP 的服务器 | +| **localhost.run** | `ssh -R 80:localhost:8000 localhost.run` | +| **部署到公网服务器** | 直接部署到有 HTTPS 的服务器 | + +### 不配回调也能测试 + +即使没有公网地址,也可以: +1. 启动应用 → 打开页面 → 点击 **"同步拉取"** 获取历史消息 +2. 在输入框输入内容 → 点击 **"发送"** 主动给客户发消息 +3. 这是**轮询模式**,不需要公网、不需要回调配置 diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..3d80610 --- /dev/null +++ b/app/config.py @@ -0,0 +1,15 @@ +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + corpid: str = "" + secret: str = "" + open_kfid: str = "" + callback_token: str = "" + callback_aes_key: str = "" + database_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/wechat_kf" + + model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} + + +settings = Settings() diff --git a/app/database.py b/app/database.py new file mode 100644 index 0000000..0916a54 --- /dev/null +++ b/app/database.py @@ -0,0 +1,16 @@ +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from app.config import settings + +engine = create_async_engine(settings.database_url, echo=False) +async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + +async def get_db() -> AsyncSession: + async with async_session() as session: + yield session + + +async def init_db(): + from app.models.message import Base + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..3e9b1fc --- /dev/null +++ b/app/main.py @@ -0,0 +1,24 @@ +from contextlib import asynccontextmanager +from fastapi import FastAPI +from fastapi.staticfiles import StaticFiles +from pathlib import Path +from app.database import init_db +from app.routes import webhook, api, pages + + +@asynccontextmanager +async def lifespan(app: FastAPI): + await init_db() + yield + + +app = FastAPI(title="微信客服 API 测试工具", lifespan=lifespan) + +# 静态文件 +static_dir = Path(__file__).parent / "static" +app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") + +# 路由 +app.include_router(pages.router) +app.include_router(webhook.router) +app.include_router(api.router) diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/message.py b/app/models/message.py new file mode 100644 index 0000000..5be2456 --- /dev/null +++ b/app/models/message.py @@ -0,0 +1,34 @@ +from datetime import datetime +from sqlalchemy import Integer, String, Text, DateTime, Index +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from sqlalchemy.sql import func + + +class Base(DeclarativeBase): + pass + + +class Message(Base): + __tablename__ = "messages" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + msg_id: Mapped[str] = mapped_column(String(128), unique=True, nullable=False) + open_kfid: Mapped[str] = mapped_column(String(128), nullable=False) + external_userid: Mapped[str] = mapped_column(String(128), nullable=False) + servicer_userid: Mapped[str | None] = mapped_column(String(128)) + send_time: Mapped[datetime] = mapped_column(DateTime, nullable=False) + msgtype: Mapped[str] = mapped_column(String(32), nullable=False) + origin: Mapped[str] = mapped_column(String(16), nullable=False) + content: Mapped[str | None] = mapped_column(Text) + raw_data: Mapped[dict | None] = mapped_column(JSONB) + direction: Mapped[str] = mapped_column(String(16), default="inbound") + status: Mapped[str] = mapped_column(String(32), default="received") + created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now()) + + __table_args__ = ( + Index("idx_messages_external_userid", "external_userid"), + Index("idx_messages_send_time", "send_time"), + Index("idx_messages_open_kfid", "open_kfid"), + ) diff --git a/app/routes/__init__.py b/app/routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routes/api.py b/app/routes/api.py new file mode 100644 index 0000000..b2b8088 --- /dev/null +++ b/app/routes/api.py @@ -0,0 +1,79 @@ +from fastapi import APIRouter, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession +from pydantic import BaseModel +from app.database import get_db +from app.services.message_service import ( + get_conversations, + get_messages, + send_and_save, + save_messages, +) +from app.services.wechat_client import sync_msg +from app.config import settings + +router = APIRouter(prefix="/api") + + +class SendMsgRequest(BaseModel): + external_userid: str + content: str + open_kfid: str = "" + + +class SyncRequest(BaseModel): + cursor: str = "" + token: str = "" + limit: int = 100 + open_kfid: str = "" + + +@router.get("/conversations") +async def list_conversations( + open_kfid: str = Query(""), + db: AsyncSession = Depends(get_db), +): + """获取会话列表""" + conversations = await get_conversations(db, open_kfid) + return {"conversations": conversations} + + +@router.get("/messages") +async def list_messages( + external_userid: str = Query(...), + open_kfid: str = Query(""), + page: int = Query(1, ge=1), + page_size: int = Query(50, ge=1, le=200), + db: AsyncSession = Depends(get_db), +): + """获取某客户的消息列表""" + messages = await get_messages(db, external_userid, open_kfid, page, page_size) + return {"messages": messages} + + +@router.post("/send") +async def send_message( + req: SendMsgRequest, + db: AsyncSession = Depends(get_db), +): + """发送消息""" + result = await send_and_save(db, req.external_userid, req.content, req.open_kfid) + return result + + +@router.post("/sync") +async def sync_messages( + req: SyncRequest, + db: AsyncSession = Depends(get_db), +): + """手动拉取消息(轮询模式)""" + kfid = req.open_kfid or settings.open_kfid + result = await sync_msg(kfid, req.cursor, req.token, req.limit) + msg_list = result.get("msg_list", []) + saved = 0 + if msg_list: + saved = await save_messages(db, msg_list) + return { + **result, + "saved": saved, + "total": len(msg_list), + } diff --git a/app/routes/pages.py b/app/routes/pages.py new file mode 100644 index 0000000..dcf0a34 --- /dev/null +++ b/app/routes/pages.py @@ -0,0 +1,13 @@ +from fastapi import APIRouter, Request +from fastapi.templating import Jinja2Templates +from pathlib import Path + +templates_dir = Path(__file__).parent.parent / "templates" +templates = Jinja2Templates(directory=str(templates_dir)) + +router = APIRouter() + + +@router.get("/") +async def index(request: Request): + return templates.TemplateResponse("index.html", {"request": request}) diff --git a/app/routes/webhook.py b/app/routes/webhook.py new file mode 100644 index 0000000..e9f87f0 --- /dev/null +++ b/app/routes/webhook.py @@ -0,0 +1,54 @@ +from fastapi import APIRouter, Request, Query, Depends +from fastapi.responses import PlainTextResponse +from sqlalchemy.ext.asyncio import AsyncSession +from app.database import get_db +from app.services.crypto import verify_url, decrypt_message +from app.services.wechat_client import sync_msg +from app.services.message_service import save_messages +from app.config import settings +import logging + +logger = logging.getLogger(__name__) +router = APIRouter() + + +@router.get("/webhook") +async def verify_callback( + msg_signature: str = Query(..., alias="msg_signature"), + timestamp: str = Query(...), + nonce: str = Query(...), + echostr: str = Query(...), +): + """URL 验证:解密 echostr 并返回明文""" + try: + plain = verify_url(msg_signature, timestamp, nonce, echostr) + return PlainTextResponse(plain) + except Exception as e: + logger.error(f"URL 验证失败: {e}") + return PlainTextResponse("verification failed", status_code=400) + + +@router.post("/webhook") +async def receive_callback(request: Request, db: AsyncSession = Depends(get_db)): + """接收消息回调:解密 XML → 拉取消息 → 入库""" + try: + xml_body = await request.body() + xml_str = xml_body.decode("utf-8") + logger.info(f"收到回调: {xml_str[:200]}") + + token, open_kfid = decrypt_message(xml_str) + if not token or not open_kfid: + logger.warning("解密后 Token 或 OpenKfId 为空") + return PlainTextResponse("fail") + + # 拉取消息 + result = await sync_msg(open_kfid, token=token, limit=100) + msg_list = result.get("msg_list", []) + if msg_list: + saved = await save_messages(db, msg_list) + logger.info(f"回调拉取消息 {len(msg_list)} 条,新增入库 {saved} 条") + + return PlainTextResponse("success") + except Exception as e: + logger.error(f"回调处理失败: {e}") + return PlainTextResponse("fail") diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/crypto.py b/app/services/crypto.py new file mode 100644 index 0000000..ba30c2c --- /dev/null +++ b/app/services/crypto.py @@ -0,0 +1,94 @@ +import base64 +import hashlib +import struct +import xml.etree.ElementTree as ET +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad +from app.config import settings + + +def _get_aes_key() -> bytes: + """将 43 位 EncodingAESKey 解码为 32 字节 AES Key""" + key = settings.callback_aes_key + # 43 位 Base64 → 补齐 = 号 → 解码得 32 字节 + return base64.b64decode(key + "=") + + +def verify_signature(token: str, timestamp: str, nonce: str, encrypt_str: str, + msg_signature: str) -> bool: + """验证消息签名""" + params = sorted([token, timestamp, nonce, encrypt_str]) + raw = "".join(params) + local_sig = hashlib.sha1(raw.encode()).hexdigest() + return local_sig == msg_signature + + +def decrypt(encrypt_str: str) -> str: + """AES-256-CBC 解密""" + aes_key = _get_aes_key() + cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16]) + raw = base64.b64decode(encrypt_str) + plain = unpad(cipher.decrypt(raw), 16) + + # 去掉前 16 字节随机串 + # 4 字节 msg_len(网络字节序)+ msg + corpid + content = plain[16:] + msg_len = struct.unpack("!I", content[:4])[0] + msg = content[4:4 + msg_len].decode("utf-8") + return msg + + +def encrypt(plain_text: str) -> str: + """AES-256-CBC 加密""" + aes_key = _get_aes_key() + import os + random_bytes = os.urandom(16) + msg_bytes = plain_text.encode("utf-8") + msg_len = struct.pack("!I", len(msg_bytes)) + corpid_bytes = settings.corpid.encode("utf-8") + raw = random_bytes + msg_len + msg_bytes + corpid_bytes + + from Crypto.Util.Padding import pad + cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_key[:16]) + encrypted = cipher.encrypt(pad(raw, 16)) + return base64.b64encode(encrypted).decode() + + +def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str: + """GET 请求:验证签名 + 解密 echostr""" + token = settings.callback_token + # 验证签名 + params = sorted([token, timestamp, nonce, echostr]) + raw = "".join(params) + local_sig = hashlib.sha1(raw.encode()).hexdigest() + if local_sig != msg_signature: + raise ValueError("签名验证失败") + return decrypt(echostr) + + +def decrypt_message(xml_body: str) -> tuple[str, str]: + """POST 请求:解密 XML 消息,返回 (token, open_kfid) + + 注意:解密后的 XML 包含 等字段 + """ + root = ET.fromstring(xml_body) + encrypt_elem = root.find("Encrypt") + if encrypt_elem is None or encrypt_elem.text is None: + raise ValueError("XML 中缺少 Encrypt 字段") + encrypt_str = encrypt_elem.text + + # 解密 + plain_text = decrypt(encrypt_str) + # 解析解密后的 XML + plain_root = ET.fromstring(plain_text) + + token = "" + open_kfid = "" + token_elem = plain_root.find("Token") + kfid_elem = plain_root.find("OpenKfId") + if token_elem is not None: + token = token_elem.text or "" + if kfid_elem is not None: + open_kfid = kfid_elem.text or "" + + return token, open_kfid diff --git a/app/services/message_service.py b/app/services/message_service.py new file mode 100644 index 0000000..9b62b7d --- /dev/null +++ b/app/services/message_service.py @@ -0,0 +1,141 @@ +from datetime import datetime +from sqlalchemy import select, func, desc, and_ +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession +from app.models.message import Message +from app.services.wechat_client import send_msg +from app.config import settings + + +async def save_messages(db: AsyncSession, msg_list: list[dict]) -> int: + """批量保存消息,按 msg_id 去重,返回新增数量""" + saved = 0 + for msg in msg_list: + stmt = pg_insert(Message).values( + msg_id=msg.get("msgid") or msg.get("msg_id", ""), + open_kfid=msg.get("open_kfid", settings.open_kfid), + external_userid=msg.get("external_userid", ""), + servicer_userid=msg.get("servicer_userid"), + send_time=datetime.fromtimestamp(msg.get("send_time", 0)), + msgtype=msg.get("msgtype", "unknown"), + origin=msg.get("origin", "customer"), + content=_extract_text_content(msg), + raw_data=msg, + direction="inbound" if msg.get("origin") != "servicer" else "outbound", + status="received", + ).on_conflict_do_nothing(index_elements=["msg_id"]) + result = await db.execute(stmt) + if result.rowcount: + saved += 1 + await db.commit() + return saved + + +async def get_conversations(db: AsyncSession, open_kfid: str = "", limit: int = 50) -> list[dict]: + """获取会话列表:按 external_userid 分组,显示最新消息""" + kfid = open_kfid or settings.open_kfid + # 子查询:每个客户的最新消息 + subq = ( + select( + Message.external_userid, + func.max(Message.send_time).label("latest_time") + ) + .where(Message.open_kfid == kfid) + .group_by(Message.external_userid) + .order_by(desc("latest_time")) + .limit(limit) + .subquery() + ) + q = ( + select(Message) + .join(subq, and_( + Message.external_userid == subq.c.external_userid, + Message.send_time == subq.c.latest_time + )) + .where(Message.open_kfid == kfid) + .order_by(desc(Message.send_time)) + ) + result = await db.execute(q) + rows = result.scalars().all() + + return [ + { + "external_userid": r.external_userid, + "latest_time": r.send_time.isoformat(), + "latest_content": r.content or "", + "latest_msgtype": r.msgtype, + } + for r in rows + ] + + +async def get_messages(db: AsyncSession, external_userid: str, + open_kfid: str = "", page: int = 1, + page_size: int = 50) -> list[dict]: + """获取某客户的消息列表(时间升序)""" + kfid = open_kfid or settings.open_kfid + offset = (page - 1) * page_size + q = ( + select(Message) + .where( + Message.open_kfid == kfid, + Message.external_userid == external_userid, + ) + .order_by(Message.send_time.asc()) + .offset(offset) + .limit(page_size) + ) + result = await db.execute(q) + rows = result.scalars().all() + + return [ + { + "msg_id": r.msg_id, + "content": r.content, + "msgtype": r.msgtype, + "send_time": r.send_time.isoformat(), + "origin": r.origin, + "direction": r.direction, + } + for r in rows + ] + + +async def send_and_save(db: AsyncSession, external_userid: str, content: str, + open_kfid: str = "") -> dict: + """发送消息并记录到数据库,返回发送结果""" + kfid = open_kfid or settings.open_kfid + result = await send_msg( + touser=external_userid, + open_kfid=kfid, + msgtype="text", + content=content, + ) + errcode = result.get("errcode", -1) + status = "sent" if errcode == 0 else "failed" + + # 记录到数据库 + msg = Message( + msg_id=result.get("msgid", f"out_{int(datetime.now().timestamp())}"), + open_kfid=kfid, + external_userid=external_userid, + servicer_userid="", + send_time=datetime.now(), + msgtype="text", + origin="servicer", + content=content, + raw_data=result, + direction="outbound", + status=status, + ) + db.add(msg) + await db.commit() + return result + + +def _extract_text_content(msg: dict) -> str: + """从消息中提取文本内容""" + if msg.get("msgtype") == "text": + text_data = msg.get("text", {}) + return text_data.get("content", "") if isinstance(text_data, dict) else str(text_data) + return "" diff --git a/app/services/token_manager.py b/app/services/token_manager.py new file mode 100644 index 0000000..31d2446 --- /dev/null +++ b/app/services/token_manager.py @@ -0,0 +1,34 @@ +import time +import asyncio +import httpx +from app.config import settings + +# token 缓存 +_token_cache: dict = {} +_lock = asyncio.Lock() + + +async def get_access_token() -> str: + """获取 access_token,带缓存和并发刷新保护""" + now = int(time.time()) + if _token_cache.get("access_token") and _token_cache.get("expires_at", 0) > now + 300: + return _token_cache["access_token"] + + async with _lock: + # 双重检查 + if _token_cache.get("access_token") and _token_cache.get("expires_at", 0) > now + 300: + return _token_cache["access_token"] + + async with httpx.AsyncClient() as client: + url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" + resp = await client.get(url, params={ + "corpid": settings.corpid, + "corpsecret": settings.secret, + }) + data = resp.json() + if data.get("errcode") != 0: + raise Exception(f"获取 access_token 失败: {data}") + + _token_cache["access_token"] = data["access_token"] + _token_cache["expires_at"] = now + data["expires_in"] + return _token_cache["access_token"] diff --git a/app/services/wechat_client.py b/app/services/wechat_client.py new file mode 100644 index 0000000..ff23287 --- /dev/null +++ b/app/services/wechat_client.py @@ -0,0 +1,40 @@ +import httpx +from app.services.token_manager import get_access_token + + +WECHAT_API_BASE = "https://qyapi.weixin.qq.com" + + +async def send_msg(touser: str, open_kfid: str, msgtype: str, content: str) -> dict: + """主动发送消息给客户""" + token = await get_access_token() + url = f"{WECHAT_API_BASE}/cgi-bin/kf/send_msg?access_token={token}" + + body = { + "touser": touser, + "open_kfid": open_kfid, + "msgtype": msgtype, + } + if msgtype == "text": + body["text"] = {"content": content} + + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=body) + return resp.json() + + +async def sync_msg(open_kfid: str, cursor: str = "", token: str = "", limit: int = 100) -> dict: + """拉取客服消息(轮询模式)""" + access_token = await get_access_token() + url = f"{WECHAT_API_BASE}/cgi-bin/kf/sync_msg?access_token={access_token}" + + body = { + "cursor": cursor, + "token": token, + "limit": limit, + "open_kfid": open_kfid, + } + + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=body) + return resp.json() diff --git a/app/static/app.js b/app/static/app.js new file mode 100644 index 0000000..4adbf3f --- /dev/null +++ b/app/static/app.js @@ -0,0 +1,162 @@ +// 状态 +let currentUser = ""; +let conversations = []; + +// 初始化 +document.addEventListener("DOMContentLoaded", () => { + loadConversations(); +}); + +// Toast +function showToast(msg, type = "success") { + const el = document.getElementById("toast"); + el.textContent = msg; + el.className = "toast toast-" + type; + el.style.display = "block"; + el.style.opacity = "1"; + setTimeout(() => { el.style.opacity = "0"; setTimeout(() => el.style.display = "none", 300); }, 2000); +} + +// 加载会话列表 +async function loadConversations() { + try { + const resp = await fetch("/api/conversations"); + const data = await resp.json(); + conversations = data.conversations || []; + renderConversations(); + + if (conversations.length === 0) { + showToast("暂无会话,请先点击"同步拉取"获取消息", "error"); + } + } catch (e) { + showToast("加载会话失败: " + e.message, "error"); + } +} + +// 渲染会话列表 +function renderConversations() { + const list = document.getElementById("conversationList"); + if (conversations.length === 0) { + list.innerHTML = '
暂无会话
'; + return; + } + list.innerHTML = conversations.map(c => ` +
+
${c.external_userid}
+
${escapeHtml(c.latest_content || '[非文本消息]')}
+
${formatTime(c.latest_time)}
+
+ `).join(""); +} + +// 选择会话 +async function selectConversation(userid) { + currentUser = userid; + document.getElementById("msgHeader").textContent = "客户: " + userid; + document.getElementById("sendBtn").disabled = false; + document.getElementById("msgInput").disabled = false; + renderConversations(); + await loadMessages(userid); +} + +// 加载消息 +async function loadMessages(userid) { + try { + const resp = await fetch(`/api/messages?external_userid=${userid}`); + const data = await resp.json(); + const msgs = data.messages || []; + renderMessages(msgs); + } catch (e) { + showToast("加载消息失败: " + e.message, "error"); + } +} + +// 渲染消息 +function renderMessages(msgs) { + const container = document.getElementById("messageList"); + if (msgs.length === 0) { + container.innerHTML = '
暂无消息
'; + return; + } + container.innerHTML = msgs.map(m => ` +
+
${escapeHtml(m.content || `[${m.msgtype}]`)}
+
${formatTime(m.send_time)}
+
+ `).join(""); + container.scrollTop = container.scrollHeight; +} + +// 发送消息 +async function sendMessage() { + if (!currentUser) return; + const input = document.getElementById("msgInput"); + const content = input.value.trim(); + if (!content) return; + + const btn = document.getElementById("sendBtn"); + btn.disabled = true; + btn.textContent = "发送中..."; + + try { + const resp = await fetch("/api/send", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ external_userid: currentUser, content }), + }); + const result = await resp.json(); + if (result.errcode === 0) { + input.value = ""; + showToast("发送成功"); + await loadMessages(currentUser); + } else { + showToast("发送失败: " + (result.errmsg || JSON.stringify(result)), "error"); + } + } catch (e) { + showToast("发送失败: " + e.message, "error"); + } finally { + btn.disabled = false; + btn.textContent = "发送"; + } +} + +// 同步拉取 +async function syncMessages() { + const statusEl = document.getElementById("syncStatus"); + statusEl.textContent = "同步中..."; + try { + const resp = await fetch("/api/sync", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }); + const data = await resp.json(); + if (data.errcode === 0) { + showToast(`同步成功,获取 ${data.total} 条消息,新增入库 ${data.saved} 条`); + await loadConversations(); + if (currentUser) await loadMessages(currentUser); + } else { + showToast("同步失败: " + (data.errmsg || JSON.stringify(data)), "error"); + } + } catch (e) { + showToast("同步失败: " + e.message, "error"); + } finally { + statusEl.textContent = ""; + } +} + +// 格式化时间 +function formatTime(iso) { + if (!iso) return ""; + const d = new Date(iso); + const pad = (n) => String(n).padStart(2, "0"); + return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ${pad(d.getHours())}:${pad(d.getMinutes())}`; +} + +// HTML 转义 +function escapeHtml(text) { + const div = document.createElement("div"); + div.textContent = text; + return div.innerHTML; +} diff --git a/app/templates/index.html b/app/templates/index.html new file mode 100644 index 0000000..ea43e1d --- /dev/null +++ b/app/templates/index.html @@ -0,0 +1,76 @@ + + + + + + 微信客服 - 消息测试 + + + + + + +
+
请选择会话
+
+
选择左侧会话查看消息
+
+
+ + +
+
+ + + + diff --git a/docs/tasks/task_detail_2026_04_27.md b/docs/tasks/task_detail_2026_04_27.md new file mode 100644 index 0000000..eac5b9d --- /dev/null +++ b/docs/tasks/task_detail_2026_04_27.md @@ -0,0 +1,17 @@ +# 任务执行摘要 + +## 会话 ID: 1 +- [2026-04-27 16:10:00] +- **执行原因**: 实现微信客服 API 网页测试应用,支持监听用户消息、主动/被动发送消息、消息存储到 PostgreSQL +- **执行过程**: + 1. 创建项目目录结构(app/models, app/services, app/routes, app/static, app/templates)。 + 2. 实现配置层:config.py(pydantic-settings 读取 .env)和 .env.example 模板。 + 3. 实现数据库层:database.py(AsyncEngine + session)和 models/message.py(Message ORM)。 + 4. 实现微信 API 客户端:token_manager.py(access_token 内存缓存 + asyncio.Lock)和 wechat_client.py(send_msg / sync_msg)。 + 5. 实现回调加解密:crypto.py(签名验证、AES-256-CBC 解密、URL 验证、消息解密)。 + 6. 实现路由层:webhook.py(GET 验证 / POST 接收回调)、api.py(会话列表/消息查询/发送/同步)、pages.py(Jinja2 模板渲染)。 + 7. 实现消息业务层:message_service.py(消息保存去重、会话列表、消息分页、发送并记录)。 + 8. 实现前端界面:index.html(左侧会话列表 + 右侧消息气泡 + 底部输入框)和 app.js(原生 JS 交互)。 + 9. 实现主入口:main.py(FastAPI 挂载路由 + 静态文件 + lifespan 建表)和 run.py(uvicorn 启动)。 + 10. 安装依赖并验证完整导入链和路由注册。 +- **执行结果**: 项目全部 19 个源文件创建完成,导入链和路由验证通过。用户需创建 .env 文件配置微信参数后即可 `python run.py` 启动。 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..eabb554 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +fastapi>=0.110.0 +uvicorn[standard]>=0.29.0 +sqlalchemy[asyncio]>=2.0.30 +asyncpg>=0.29.0 +httpx>=0.27.0 +pycryptodome>=3.20.0 +pydantic-settings>=2.2.0 +jinja2>=3.1.3 +python-multipart>=0.0.9 diff --git a/run.py b/run.py new file mode 100644 index 0000000..68d2f26 --- /dev/null +++ b/run.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python +import uvicorn + +if __name__ == "__main__": + uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)