Files
2026-04-23 17:22:20 +08:00

319 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import os, json, uuid, ssl, time, re
from urllib import request as urlrequest
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
)
# ========== 环境变量 ==========
BOT_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"]
# [修改] 解析允许的用户ID列表 (支持逗号分隔)
raw_ids = os.environ.get("TELEGRAM_ALLOWED_USER_ID", "")
# 去除两端空格,按逗号分割,并过滤掉空项
ALLOWED_UIDS = [x.strip() for x in raw_ids.split(',') if x.strip()]
STATE_PATH = os.environ.get("STATE_PATH", "/shared/state.json")
# 用于 /addtocart 时构造请求体
MARKET_LOCALE = os.environ.get("MITM_MARKET_LOCALE", "en-ng") # e.g. en-ng
MARKET_CODE = os.environ.get("MITM_MARKET_CODE" , "NG") # e.g. NG
UA = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_3_1 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) EdgiOS/133.0.3065.54 "
"Version/18.0 Mobile/15E148 Safari/604.1"
)
CART_API_URL = (
"https://cart.production.store-web.dynamics.com/"
"cart/v1.0/cart/loadCart?cartType=consumer&appId=StoreWeb"
)
# ========== 工具函数state.json 读写 ==========
def _load_state():
try:
with open(STATE_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_state(st):
tmp = STATE_PATH + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(st, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(tmp, STATE_PATH)
def _getv(key, default=""):
st = _load_state()
return st.get(key, default)
def _setv(key, value):
st = _load_state()
st[key] = value
_save_state(st)
def _get_products(st=None):
if st is None:
st = _load_state()
arr = st.get("__xbox_products__", [])
out = []
if isinstance(arr, list):
for it in arr:
try:
p = str(it.get("ProductId","")).strip()
s = str(it.get("SkuId","")).strip()
a = str(it.get("AvailabilityId","")).strip()
if p and s and a:
out.append({"ProductId":p,"SkuId":s,"AvailabilityId":a})
except Exception:
continue
return out
def _save_products_no_notify(new_list):
"""
用在机器人手动指令时更新 products 列表和 XboxProductList
但不发送任何 Telegram 自动推送(因为推送是 mitmproxy 插件做的)。
"""
# 去重
seen = set()
uniq = []
for it in new_list:
key = f"{it['ProductId']}||{it['SkuId']}||{it['AvailabilityId']}"
if key in seen:
continue
seen.add(key)
uniq.append(it)
st = _load_state()
st["__xbox_products__"] = uniq
parts = []
for i, it in enumerate(uniq, 1):
parts.append(f"product{i}={it['ProductId']}|{it['SkuId']}|{it['AvailabilityId']}")
st["XboxProductList"] = ";".join(parts)
_save_state(st)
# ========== 工具函数:加购 (复用 mitmproxy 那套逻辑) ==========
def _cart_put_single(muid: str, ms_cv: str, pid: str, sid: str, aid: str, x_vec: str = ""):
"""
对单个 (ProductId, SkuId, AvailabilityId) 调用微软购物车接口。
"""
payload = {
"locale": MARKET_LOCALE,
"market": MARKET_CODE,
"catalogClientType": "storeWeb",
"friendlyName": f"cart-{MARKET_CODE}",
"riskSessionId": str(uuid.uuid4()),
"clientContext": {"client": "UniversalWebStore.Cart", "deviceType": "Pc"},
"itemsToAdd": {
"items": [
{
"productId": pid,
"skuId": sid,
"availabilityId": aid,
"campaignId": "xboxcomct",
"quantity": 1,
}
]
},
}
data = json.dumps(payload, separators=(",", ":")).encode("utf-8")
req = urlrequest.Request(CART_API_URL, data=data, method="PUT")
req.add_header("content-type", "application/json")
req.add_header("accept", "*/*")
req.add_header("x-authorization-muid", muid or "")
req.add_header("x-validation-field-1", "9pgbhbppjf2b")
req.add_header("ms-cv", ms_cv or "")
if x_vec:
req.add_header("x-ms-vector-id", x_vec)
req.add_header("accept-language", "en-US,en;q=0.9")
req.add_header("accept-encoding", "gzip, deflate, br")
req.add_header("sec-fetch-site", "cross-site")
req.add_header("sec-fetch-mode", "cors")
req.add_header("sec-fetch-dest", "empty")
req.add_header("origin", "https://www.microsoft.com")
req.add_header("referer", "https://www.microsoft.com/")
req.add_header("user-agent", UA)
ctx_ssl = ssl.create_default_context()
with urlrequest.urlopen(req, context=ctx_ssl, timeout=20) as resp:
body = resp.read()[:2048]
return resp.status, (200 <= resp.status < 300), body
# ========== 安全检查 / 回复封装 ==========
async def _auth_guard(update: Update, context: ContextTypes.DEFAULT_TYPE):
uid = str(update.effective_user.id)
# [修改] 检查 ID 是否在列表中
if uid not in ALLOWED_UIDS:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=f"❌ Unauthorized (UID: {uid})", # 方便调试显示未授权ID
disable_notification=False,
)
return False
return True
async def _reply(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str):
# 统一把 disable_notification=False 打开,保证有推送提醒
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=text,
disable_notification=False,
)
# ========== 命令实现 ==========
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await _auth_guard(update, context):
return
await _reply(
update,
context,
"👋 你好!我是 Xbox 助手。\n可以使用 /help 查看帮助",
)
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await _auth_guard(update, context):
return
help_text = (
"可用指令:\n"
"/check - 查看当前关键参数(cartId等)和XboxProductList摘要\n"
"/viewproducts - 查看已捕捉到的全部三元组(ProductId, SkuId, AvailabilityId)\n"
"/clearproducts - 清空捕捉到的产品列表\n"
"/addtocart - 尝试把所有已捕捉的产品逐个加进购物车\n"
"/help - 再次查看本帮助\n"
)
await _reply(update, context, help_text)
async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await _auth_guard(update, context):
return
st = _load_state()
cart_id = st.get("cartId","")
auth = st.get("authorization","")
muid = st.get("fiddler.custom.cart-x-authorization-muid","")
ms_cv = st.get("fiddler.custom.cart-ms-cv","")
xvec = st.get("fiddler.custom.cart-x-ms-vector-id","")
plist_str = st.get("XboxProductList","")
products = _get_products(st)
countp = len(products)
text = (
"📦 /check\n"
f"cartId: {cart_id or '(none)'}\n"
f"authorization(len): {len(auth) if auth else 0}\n"
f"x-authorization-muid: {muid or '(none)'}\n"
f"ms-cv: {ms_cv or '(none)'}\n"
f"x-ms-vector-id: {xvec or '(none)'}\n"
f"XboxProductList: {plist_str or '(empty)'}\n"
f"products_count: {countp}\n"
)
await _reply(update, context, text)
async def cmd_viewproducts(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await _auth_guard(update, context):
return
products = _get_products()
if not products:
await _reply(update, context, "📄 /viewproducts\n当前产品列表为空")
return
lines = ["📄 /viewproducts"]
for idx, it in enumerate(products, 1):
lines.append(
f"{idx}. {it['ProductId']} | {it['SkuId']} | {it['AvailabilityId']}"
)
await _reply(update, context, "\n".join(lines))
async def cmd_clearproducts(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await _auth_guard(update, context):
return
_save_products_no_notify([]) # 清空并写回
await _reply(update, context, "🗑 已清空产品列表")
async def cmd_addtocart(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await _auth_guard(update, context):
return
st = _load_state()
products = _get_products(st)
if not products:
await _reply(update, context, "🛒 /addtocart\n没有待加入的产品")
return
muid = st.get("fiddler.custom.cart-x-authorization-muid","")
ms_cv = st.get("fiddler.custom.cart-ms-cv","")
x_vec = st.get("fiddler.custom.cart-x-ms-vector-id","")
if (not muid) or (not ms_cv):
await _reply(update, context,
"🛒 /addtocart\n缺少 cart-x-authorization-muid 或 cart-ms-cv无法继续")
return
successes = []
failures = []
remain = list(products)
for item in products:
pid = item["ProductId"]
sid = item["SkuId"]
aid = item["AvailabilityId"]
triple_label = f"{pid}/{sid}/{aid}"
try:
code, ok, _ = _cart_put_single(muid, ms_cv, pid, sid, aid, x_vec)
except Exception as ex:
code = 0
ok = False
if ok:
successes.append(f"{triple_label} -> {code}")
# 从 remain 里删掉成功的
remain = [
r for r in remain
if not (r["ProductId"] == pid and r["SkuId"] == sid and r["AvailabilityId"] == aid)
]
else:
failures.append(f"{triple_label} -> {code or 'ERR'}")
time.sleep(0.15)
# 更新 state.json 里的 products / XboxProductList (不推送通知,只本地改)
_save_products_no_notify(remain)
summary = (
"🛒 /addtocart 完成\n"
f"尝试: {len(products)}\n"
f"成功: {len(successes)}\n"
f"失败: {len(failures)}\n\n"
)
if successes:
summary += "✅ 成功:\n" + "\n".join(successes[:20]) + ("\n..." if len(successes) > 20 else "") + "\n"
if failures:
summary += "❌ 失败:\n" + "\n".join(failures[:20]) + ("\n..." if len(failures) > 20 else "") + "\n"
summary += f"\n剩余待加入: {len(remain)}"
await _reply(update, context, summary)
# ========== 主入口 ==========
def main():
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("check", cmd_check))
app.add_handler(CommandHandler("viewproducts", cmd_viewproducts))
app.add_handler(CommandHandler("clearproducts",cmd_clearproducts))
app.add_handler(CommandHandler("addtocart", cmd_addtocart))
# 直接长轮询。通知静默控制在我们send_message里disable_notification=False
app.run_polling(drop_pending_updates=True)
if __name__ == "__main__":
main()