176 lines
5.8 KiB
Python
176 lines
5.8 KiB
Python
from datetime import datetime
|
||
from sqlalchemy import select, func, desc, and_
|
||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from app.models.message import Message
|
||
from app.services.wechat_client import send_msg
|
||
from app.config import settings
|
||
|
||
|
||
async def save_messages(db: AsyncSession, msg_list: list[dict]) -> int:
|
||
"""批量保存消息,按 msg_id 去重,返回新增数量"""
|
||
saved = 0
|
||
for msg in msg_list:
|
||
origin = _normalize_origin(msg.get("origin", 3))
|
||
stmt = pg_insert(Message).values(
|
||
msg_id=str(msg.get("msgid") or msg.get("msg_id", "")),
|
||
open_kfid=str(msg.get("open_kfid", settings.open_kfid)),
|
||
external_userid=str(msg.get("external_userid", "")),
|
||
servicer_userid=msg.get("servicer_userid"),
|
||
send_time=datetime.fromtimestamp(msg.get("send_time", 0)),
|
||
msgtype=str(msg.get("msgtype", "unknown")),
|
||
origin=origin,
|
||
content=_extract_text_content(msg),
|
||
raw_data=msg,
|
||
direction="inbound" if origin != "servicer" else "outbound",
|
||
status="received",
|
||
).on_conflict_do_nothing(index_elements=["msg_id"])
|
||
result = await db.execute(stmt)
|
||
if result.rowcount:
|
||
saved += 1
|
||
await db.commit()
|
||
return saved
|
||
|
||
|
||
async def get_conversations(db: AsyncSession, open_kfid: str = "", limit: int = 50) -> list[dict]:
|
||
"""获取会话列表:按 external_userid 分组,显示最新消息"""
|
||
kfid = open_kfid or settings.open_kfid
|
||
# 子查询:每个客户的最新消息
|
||
subq = (
|
||
select(
|
||
Message.external_userid,
|
||
func.max(Message.send_time).label("latest_time")
|
||
)
|
||
.where(Message.open_kfid == kfid)
|
||
.group_by(Message.external_userid)
|
||
.order_by(desc("latest_time"))
|
||
.limit(limit)
|
||
.subquery()
|
||
)
|
||
q = (
|
||
select(Message)
|
||
.join(subq, and_(
|
||
Message.external_userid == subq.c.external_userid,
|
||
Message.send_time == subq.c.latest_time
|
||
))
|
||
.where(Message.open_kfid == kfid)
|
||
.order_by(desc(Message.send_time))
|
||
)
|
||
result = await db.execute(q)
|
||
rows = result.scalars().all()
|
||
|
||
return [
|
||
{
|
||
"external_userid": r.external_userid,
|
||
"latest_time": r.send_time.isoformat(),
|
||
"latest_content": r.content or "",
|
||
"latest_msgtype": r.msgtype,
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
async def get_messages(db: AsyncSession, external_userid: str,
|
||
open_kfid: str = "", page: int = 1,
|
||
page_size: int = 50) -> list[dict]:
|
||
"""获取某客户的消息列表(时间升序)"""
|
||
kfid = open_kfid or settings.open_kfid
|
||
offset = (page - 1) * page_size
|
||
q = (
|
||
select(Message)
|
||
.where(
|
||
Message.open_kfid == kfid,
|
||
Message.external_userid == external_userid,
|
||
)
|
||
.order_by(Message.send_time.asc())
|
||
.offset(offset)
|
||
.limit(page_size)
|
||
)
|
||
result = await db.execute(q)
|
||
rows = result.scalars().all()
|
||
|
||
return [
|
||
{
|
||
"msg_id": r.msg_id,
|
||
"content": r.content,
|
||
"msgtype": r.msgtype,
|
||
"send_time": r.send_time.isoformat(),
|
||
"origin": r.origin,
|
||
"direction": r.direction,
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
async def send_and_save(db: AsyncSession, external_userid: str, content: str,
|
||
open_kfid: str = "") -> dict:
|
||
"""发送消息并记录到数据库,返回发送结果"""
|
||
kfid = open_kfid or settings.open_kfid
|
||
result = await send_msg(
|
||
touser=external_userid,
|
||
open_kfid=kfid,
|
||
msgtype="text",
|
||
content=content,
|
||
)
|
||
errcode = result.get("errcode", -1)
|
||
status = "sent" if errcode == 0 else "failed"
|
||
|
||
# 记录到数据库
|
||
msg = Message(
|
||
msg_id=result.get("msgid", f"out_{int(datetime.now().timestamp())}"),
|
||
open_kfid=kfid,
|
||
external_userid=external_userid,
|
||
servicer_userid="",
|
||
send_time=datetime.now(),
|
||
msgtype="text",
|
||
origin="servicer",
|
||
content=content,
|
||
raw_data=result,
|
||
direction="outbound",
|
||
status=status,
|
||
)
|
||
db.add(msg)
|
||
await db.commit()
|
||
return result
|
||
|
||
|
||
# 微信客服 API 常见错误码 → 中文说明
|
||
WECHAT_ERROR_MAP = {
|
||
95000: "客服账号 ID (open_kfid) 无效",
|
||
95001: "发送消息数量超限:客户未回复时,12 小时内最多主动发送 5 条消息",
|
||
95002: "客户会话已结束,无法发送消息",
|
||
95003: "客户不在当前客服账号的接待范围内",
|
||
95004: "接待人员未实名,无法发送消息",
|
||
95005: "接待人员未在企业微信客户端在线,无法发送消息",
|
||
48001: "API 接口无权限,请检查 Secret 对应的应用权限",
|
||
40001: "access_token 无效或已过期",
|
||
41001: "缺少 access_token 参数",
|
||
42001: "access_token 过期",
|
||
40003: "不合法的 OpenID",
|
||
40014: "不合法的 access_token",
|
||
45009: "API 调用频率超限",
|
||
}
|
||
|
||
|
||
def _wechat_errmsg(result: dict) -> str:
|
||
"""将微信 API 错误码转为中文说明"""
|
||
code = result.get("errcode", 0)
|
||
if code == 0:
|
||
return "ok"
|
||
return WECHAT_ERROR_MAP.get(code, result.get("errmsg", f"未知错误(errcode={code})"))
|
||
|
||
|
||
def _normalize_origin(origin) -> str:
|
||
"""将微信 API 返回的 origin 整数转为字符串"""
|
||
if isinstance(origin, int):
|
||
return {3: "customer", 4: "system", 5: "servicer"}.get(origin, str(origin))
|
||
return str(origin)
|
||
|
||
|
||
def _extract_text_content(msg: dict) -> str:
|
||
"""从消息中提取文本内容"""
|
||
if msg.get("msgtype") == "text":
|
||
text_data = msg.get("text", {})
|
||
return text_data.get("content", "") if isinstance(text_data, dict) else str(text_data)
|
||
return ""
|