commit 401c879ffd4d4ac4833796ae72b34f142e1ce840 Author: cat_shark <1716967236@qq.com> Date: Wed May 20 22:11:48 2026 +0800 Initial commit: EF course autopilot tool Auto-complete EF English courses with JWT token authentication. Supports multiple task types: multiple-choice, gapfill, matching, flashcards, speaking-practice, text-highlights, sequencing, media-with-time-markers, language-focus. Co-Authored-By: Claude Opus 4.7 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..19727d0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ + +# IDE +.idea/ + +# Environment +.env +venv/ +.venv/ + +# Token +token.txt + +# OS +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..6d28d13 --- /dev/null +++ b/README.md @@ -0,0 +1,75 @@ +# EF Course Autopilot + +自动完成 EF (English First) 企业英语课程的工具。 + +## 文件说明 + +| 文件 | 说明 | +|---|---| +| `ef_course_autopilot.py` | 核心脚本 — 自动完成单门课程 | +| `ef_course_loop.py` | 交互式循环脚本 — 批量完成多门课程(**推荐使用**)| +| `token.txt` | JWT token 文件(需自行创建) | + +## 快速开始 + +### 1. 获取 JWT Token + +1. 用 Chrome 打开 `https://learn.corporate.ef.com.cn` 并登录 +2. 按 F12 打开开发者工具 → 切换到 **Network(网络)** 标签 +3. 找一个请求,复制其请求头中 `ef_access_token` 的值(一长串 JWT 字符串) +4. 将复制的 token 粘贴到 `token.txt` 文件,**仅保留一行内容,不要有多余换行或空格** + +### 2. 运行 + +```bash +python3 ef_course_loop.py +``` + +脚本会自动读取运行目录下的 `token.txt`,然后交互式询问课程数量,完成后询问是否继续。 + +如需指定自定义 token 文件路径: + +```bash +python3 ef_course_loop.py --token-file /path/to/token.txt +``` + +SSL 验证默认禁用(macOS Python 证书问题),如需启用: + +```bash +python3 ef_course_loop.py --verify-ssl +``` + +## 工作原理 + +脚本模拟浏览器请求,按以下步骤自动完成课程: + +1. **GET focus** — 获取当前课程的 courseId / nodeId +2. **POST open-lesson** — 打开课程,获取 lessonId +3. **POST lesson/command (open-lesson)** — 初始化课程,获取全部 Activity/Task +4. **POST lesson/command (submit-task-response) ×N** — 逐个提交任务答案 + +### 支持的任务类型 + +- `multiple-choice` — 选择题(随机选一个选项) +- `language-focus` — 语言知识点(标记已查看) +- `media-with-time-markers` — 音视频(标记已播放) +- `gapfill` — 填空(保留 expectedResponse) +- `matching` — 匹配题(保留 expectedResponse) +- `flashcards` — 闪卡(所有卡片标记已翻转) +- `text-highlights` — 文本高亮(标记已查看) +- `sequencing` — 排序(保留 expectedResponse) +- `speaking-practice` — 口语练习(可跳过) + +## SSL 证书问题 + +macOS 上 Python 3.12 可能出现 `SSL: CERTIFICATE_VERIFY_FAILED` 错误。 + +**解决办法一:安装证书** +```bash +/Applications/Python\ 3.12/Install\ Certificates.command +``` + +**解决办法二:启用验证**(如需) +```bash +python3 ef_course_loop.py --verify-ssl +``` diff --git a/build.py b/build.py new file mode 100644 index 0000000..a22e1bb --- /dev/null +++ b/build.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +""" +打包脚本 — 使用 PyInstaller 将 ef_course_loop.py 打包为单文件可执行程序。 + +用法: + python3 build.py + +依赖: + pip3 install pyinstaller + +注意:需要在目标平台上分别运行打包(PyInstaller 不支持交叉编译)。 +""" + +import os +import platform +import shutil +import subprocess +import sys + + +def main(): + system = platform.system() + print(f"当前平台: {system}") + + # 检查 PyInstaller + if not shutil.which("pyinstaller"): + print("❌ 未找到 PyInstaller,请先安装: pip3 install pyinstaller") + sys.exit(1) + + # 清理上次打包产物 + for p in ["build", "dist", "ef_course_autopilot.spec"]: + if os.path.exists(p): + shutil.rmtree(p) if os.path.isdir(p) else os.remove(p) + print(f" 清理: {p}") + + # 根据平台设置输出名称 + output_name = "ef_course_autopilot.exe" if system == "Windows" else "ef_course_autopilot" + + print("\n开始打包...") + cmd = [ + "pyinstaller", + "--onefile", + "--name", output_name, + "--distpath", "dist", + "--workpath", "build", + "--specpath", ".", + "ef_course_loop.py", + ] + + result = subprocess.run(cmd) + if result.returncode != 0: + print(f"\n❌ 打包失败 (exit code: {result.returncode})") + sys.exit(1) + + # 打包完成后附带 token.txt.example 到 dist 目录 + example_src = "token.txt.example" + example_dst = os.path.join("dist", example_src) + if os.path.exists(example_src): + shutil.copy2(example_src, example_dst) + + print(f"\n{'=' * 50}") + print(f" ✅ 打包成功!") + print(f" 输出: dist/{output_name}") + print(f" 提示: 将 token.txt 放在可执行文件同目录下即可运行") + print(f"{'=' * 50}") + + +if __name__ == "__main__": + main() diff --git a/ef_course_autopilot.py b/ef_course_autopilot.py new file mode 100755 index 0000000..3097e47 --- /dev/null +++ b/ef_course_autopilot.py @@ -0,0 +1,1111 @@ +#!/usr/bin/env python3 +""" +EF Course Autopilot — 自动完成EF课程流程的测试脚本 + +基于HAR请求依赖分析,自动完成课程的最简路径为: + ① GET /wl/api/study-plan/focus → 获取courseId/nodeId + ② POST /study/progress/enrollments/{courseId}/open-lesson → 获取lessonId + ③ POST /study/lesson/command (open-lesson) → 获取sessionId/activityId/taskId/lastVersion + ④ POST /study/lesson/command (submit-task-response) ×N → 逐个提交任务 + +认证机制: + - 步骤① (learn.corporate域): 需要 X-EF-Access JWT token(约10小时有效期) + - 步骤②-④ (api.ef.studio域): 需要 Authorization: Bearer JWT token + +最简使用: + python3 ef_course_autopilot.py --token "eyJraWQiOi..." + +可选参数: + --base-url EF学习平台基础URL (默认: https://learn.corporate.ef.com.cn) + --studio-api Studio API基础URL (默认: https://api.ef.studio.ef.com.cn) + --locale 界面语言 (默认: zh-CN) + --timezone 时区 (默认: Asia/Shanghai) + --course-id 直接指定courseId (配合--node-id使用, 跳过focus) + --node-id 直接指定要打开的课时nodeId (配合--course-id使用) + --delay 每次submit之间的延迟秒数 (默认: 1.0) + --skip-on-fail 提交失败时自动跳过该任务继续下一个 + --dry-run 仅打印将要执行的请求,不实际发送 + --verbose 打印详细请求/响应信息 +""" + +import argparse +import json +import random +import sys +import time +import traceback +import uuid +from dataclasses import dataclass, field +from typing import Any, Optional + +import requests + +# ============================================================ +# 配置 +# ============================================================ + +DEFAULT_BASE_URL = "https://learn.corporate.ef.com.cn" +DEFAULT_STUDIO_API = "https://api.ef.studio.ef.com.cn" +DEFAULT_LOCALE = "zh-CN" +DEFAULT_TIMEZONE = "Asia/Shanghai" +DEFAULT_DELAY = 1.0 + +# 超时设置(秒) +REQUEST_TIMEOUT = 30 + +# 最大重试次数 +MAX_RETRIES = 2 + + +# ============================================================ +# 数据类 +# ============================================================ + +@dataclass +class CourseContext: + """从focus API获取的课程上下文""" + course_id: str = "" + level_id: str = "" + unit_id: str = "" + node_id: str = "" # 要打开的课时ID (= lesson.id) + + +@dataclass +class LessonSession: + """课程会话信息""" + lesson_id: str = "" + session_id: str = "" + last_version: int = 0 + + +@dataclass +class TaskInfo: + """单个任务的完整信息""" + activity_id: str = "" + task_id: str = "" + task_type: str = "" + expected_response: dict = field(default_factory=dict) + task_detail: dict = field(default_factory=dict) # task子结构, 包含各类型的具体数据 + + +@dataclass +class ActivityInfo: + """一个Activity(含多个task)""" + activity_id: str = "" + tasks: list[TaskInfo] = field(default_factory=list) + + +@dataclass +class SubmitResult: + """单个任务提交结果""" + task_id: str = "" + task_type: str = "" + success: bool = False + skipped: bool = False + error: str = "" + new_version: int = 0 + + +# ============================================================ +# 自动答题策略 +# ============================================================ + +def build_response_media_with_time_markers(task: TaskInfo) -> dict: + """media-with-time-markers: 视频标记 — 全部标记为已播放""" + markers = task.task_detail.get("mediaWithTimeMarkers", {}) + targets = markers.get("targets", []) + target_responses = {} + for t in targets: + tid = t.get("id", "") + target_responses[tid] = {"id": tid, "userInput": "played"} + return { + "mediaWithTimeMarkers": { + "videoEnded": True, + "targetResponses": target_responses, + } + } + + +def build_response_language_focus(task: TaskInfo) -> dict: + """language-focus: 语言聚焦 — 标记已查看, 全部媒体已播放""" + lf = task.task_detail.get("languageFocus", {}) + media_items = lf.get("mediaItems", []) + media_state = {} + for m in media_items: + mid = m.get("id", "") + media_state[mid] = {"id": mid, "userInput": "played"} + return { + "languageFocus": { + "contentSeen": True, + "mediaState": media_state, + } + } + + +def build_response_sequencing(task: TaskInfo) -> dict: + """sequencing: 排序题 — 优先使用expectedResponse中的正确顺序""" + expected = task.expected_response.get("contents", {}).get("sequencing", {}) + if expected: + return {"sequencing": expected} + + # 回退:按items出现顺序分配0,1,2... + seq = task.task_detail.get("sequencing", {}) + items = seq.get("items", []) + result = {} + for i, item in enumerate(items): + iid = item.get("id", "") + result[iid] = {"id": iid, "userInput": i} + return {"sequencing": result} + + +def build_response_gapfill(task: TaskInfo) -> dict: + """gapfill: 填空题 — 优先使用expectedResponse中的正确答案""" + expected = task.expected_response.get("contents", {}).get("gapfill", {}) + if expected: + return {"gapfill": expected} + + # 回退:从task.detail获取gap列表, 填入空字符串 + gf = task.task_detail.get("gapfill", {}) + gaps = gf.get("gaps", gf.get("items", [])) + result = {} + for g in gaps: + gid = g.get("id", "") + result[gid] = {"id": gid, "userInput": ""} + return {"gapfill": result} + + +def build_response_matching(task: TaskInfo) -> dict: + """matching: 匹配题 — 优先使用expectedResponse中的正确匹配""" + expected = task.expected_response.get("contents", {}).get("matching", {}) + if expected: + return {"matching": expected} + + # 回退:从task.detail获取items, 空匹配 + mf = task.task_detail.get("matching", {}) + items = mf.get("items", []) + result = {} + for item in items: + iid = item.get("id", "") + result[iid] = {"id": iid, "userInput": ""} + return {"matching": result} + + +def build_response_speaking_practice(task: TaskInfo) -> dict: + """speaking-practice: 口语练习 — 使用expectedResponse + 低分""" + expected = task.expected_response.get("contents", {}).get("speakingPractice", {}) + if expected: + user_input = expected.get("userInput", {}) + if isinstance(user_input, dict): + user_input.setdefault("speechScoreSummary", { + "score": 0.4, + "source": "", + "sourceResponse": {}, + "wordScores": [], + "alternativeInputScores": {}, + }) + user_input.setdefault("data", []) + return {"speakingPractice": expected} + + # 回退:构造最低限度通过的回答 + sp = task.task_detail.get("speakingPractice", {}) + target_text = sp.get("targetText", "Hello.") + target_locale = sp.get("targetLocale", "en_US") + return { + "speakingPractice": { + "userInput": { + "data": [], + "id": "", + "speechScoreSummary": { + "alternativeInputScores": {}, + "score": 0.4, + "source": "", + "sourceResponse": {}, + "wordScores": [], + }, + "targetLocale": target_locale, + "targetText": target_text, + } + } + } + + +def build_response_multiple_choice(task: TaskInfo) -> dict: + """multiple-choice: 选择题 — 优先使用expectedResponse中的正确选项""" + expected = task.expected_response.get("contents", {}).get("multipleChoice", {}) + if expected: + return {"multipleChoice": expected} + + # 回退:从task.detail获取options, 随机选一个 + mc = task.task_detail.get("multipleChoice", {}) + contents = mc.get("contents", {}) + options = mc.get("options", list(contents.keys()) if contents else []) + + result = {} + selected = False + for oid in (options if isinstance(options, list) else [options]): + if isinstance(oid, dict): + oid = oid.get("id", "") + result[oid] = {"id": oid, "userInput": "selected" if not selected else "not-selected"} + selected = True + return {"multipleChoice": {"contents": result}} + + +def build_response_text_highlights(task: TaskInfo) -> dict: + """text-highlights: 文本高亮 — 标记所有内容已查看""" + expected = task.expected_response.get("contents", {}).get("textHighlights", {}) + if expected: + return {"textHighlights": expected} + + th = task.task_detail.get("textHighlights", {}) + highlights = th.get("highlights", th.get("items", [])) + result = {} + for h in highlights: + hid = h.get("id", "") + result[hid] = {"id": hid, "userInput": "viewed"} + return {"textHighlights": result} + + +def build_response_flashcards(task: TaskInfo) -> dict: + """flashcards: 闪卡 — 优先使用expectedResponse,否则标记所有卡片已翻转、媒体已播放""" + expected = task.expected_response.get("contents", {}).get("flashcards", {}) + if expected: + return {"flashcards": expected} + + # 回退:从task.detail获取卡片信息,标记全部已翻转/播放 + fc = task.task_detail.get("flashcards", {}).get("flashcards", []) + cards_state = {} + media_state = {} + for card in fc: + cid = card.get("id", "") + if cid: + cards_state[cid] = {"id": cid, "userInput": "flipped"} + audio_id = card.get("audio", {}).get("id", "") + if audio_id: + media_state[audio_id] = {"id": audio_id, "userInput": "played"} + image_id = card.get("image", {}).get("id", "") + if image_id: + media_state[image_id] = {"id": image_id, "userInput": "played"} + return { + "flashcards": { + "contentSeen": True, + "cardsState": cards_state, + "mediaState": media_state, + } + } + + +# 任务类型 → 答题策略映射 +RESPONSE_BUILDERS = { + "media-with-time-markers": build_response_media_with_time_markers, + "language-focus": build_response_language_focus, + "sequencing": build_response_sequencing, + "gapfill": build_response_gapfill, + "matching": build_response_matching, + "flashcards": build_response_flashcards, + "speaking-practice": build_response_speaking_practice, + "multiple-choice": build_response_multiple_choice, + "text-highlights": build_response_text_highlights, +} + + +# ============================================================ +# 核心客户端 +# ============================================================ + +class EFCourseAutopilot: + """EF课程自动完成客户端 + + 认证机制: + - 步骤① (learn.corporate域): 需要 X-EF-Access JWT token + - 步骤②-④ (api.ef.studio域): 需要 Authorization: Bearer JWT token + """ + + def __init__( + self, + token: str = "", + cookie: str = "", + base_url: str = DEFAULT_BASE_URL, + studio_api: str = DEFAULT_STUDIO_API, + locale: str = DEFAULT_LOCALE, + timezone: str = DEFAULT_TIMEZONE, + delay: float = DEFAULT_DELAY, + skip_on_fail: bool = False, + verify_ssl: bool = False, + dry_run: bool = False, + verbose: bool = False, + ): + self.token = token + self.cookie = cookie + self.base_url = base_url.rstrip("/") + self.studio_api = studio_api.rstrip("/") + self.locale = locale + self.timezone = timezone + self.delay = delay + self.skip_on_fail = skip_on_fail + self.verify_ssl = verify_ssl + self.dry_run = dry_run + self.verbose = verbose + + # 根据locale推断instructionsLocale (zh-CN → zh_CN) + self.instructions_locale = locale.replace("-", "_") + if self.instructions_locale not in ("zh_CN", "en_US", "ja_JP", "ko_KR", "de_DE", "fr_FR", "es_ES", "pt_BR"): + self.instructions_locale = "en_US" + + self.session = requests.Session() + self._setup_session() + + # 状态 + self.ctx = CourseContext() + self.lesson = LessonSession() + self.activities: list[ActivityInfo] = [] + self.results: list[SubmitResult] = [] + + def _setup_session(self): + """配置HTTP会话的通用headers和SSL设置""" + self.session.headers.update({ + "Accept": "application/json, text/plain, */*", + "Content-Type": "application/json", + "User-Agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/148.0.0.0 Safari/537.36" + ), + "Cache-Control": "no-cache", + "Pragma": "no-cache", + }) + + # SSL证书验证 + if not self.verify_ssl: + self.session.verify = False + # 禁用InsecureRequestWarning警告 + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + def _learn_headers(self) -> dict: + """learn.corporate域名的请求headers (需要X-EF-Access或Cookie) + + 认证优先级: + 1. X-EF-Access token (JWT, ~10小时有效) + 2. Cookie (浏览器完整Cookie字符串) + """ + headers = { + "Referer": f"{self.base_url}/wl/study-plan?product=b2b", + "x-ef-correlation-id": self._correlation_id(), + } + if self.token: + headers["X-EF-Access"] = self.token + if self.cookie: + headers["Cookie"] = self.cookie + return headers + + + def _studio_headers(self) -> dict: + """api.ef.studio域名的请求headers + + 需要 Authorization: Bearer JWT token。 + """ + headers = { + "Origin": "https://lesson-player.ef.studio.ef.com.cn", + "Referer": "https://lesson-player.ef.studio.ef.com.cn/", + "x-ef-correlation-id": self._correlation_id(), + } + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + return headers + + def _correlation_id(self) -> str: + """生成x-ef-correlation-id (UUID格式)""" + return str(uuid.uuid4()) + + def _log(self, msg: str): + """普通日志""" + print(f" {msg}") + + def _log_step(self, step: str, msg: str): + """步骤日志""" + print(f"\n{'='*60}") + print(f" {step}: {msg}") + print(f"{'='*60}") + + def _log_progress(self, current: int, total: int, label: str): + """进度日志""" + pct = current / total * 100 if total > 0 else 0 + bar_len = 30 + filled = int(bar_len * current / total) if total > 0 else 0 + bar = "█" * filled + "░" * (bar_len - filled) + print(f"\r [{bar}] {pct:.0f}% ({current}/{total}) {label}", end="", flush=True) + + # -------------------------------------------------------- + # 步骤①: 获取课程焦点 + # -------------------------------------------------------- + + def step_get_focus(self) -> CourseContext: + """GET /wl/api/study-plan/focus → 获取courseId/levelId/unitId/nodeId + + 需要认证:X-EF-Access token 或 Cookie + """ + self._log_step("步骤①", "GET /wl/api/study-plan/focus") + + url = f"{self.base_url}/wl/api/study-plan/focus" + params = {"locale": self.locale} + headers = self._learn_headers() + + if self.dry_run: + self._log(f"[DRY-RUN] GET {url} params={params}") + self._log(f"[DRY-RUN] Auth: {'X-EF-Access' if self.token else 'Cookie' if self.cookie else 'NONE'}") + return self.ctx + + resp = self.session.get(url, params=params, headers=headers, timeout=REQUEST_TIMEOUT) + resp.raise_for_status() + data = resp.json() + + if self.verbose: + self._log(f"Response: {json.dumps(data, ensure_ascii=False)[:800]}...") + + # 解析focus数据 — 兼容多种响应结构 + focus = data.get("focus", data) + + course = focus.get("course", {}) + level = focus.get("level", {}) + unit = focus.get("unit", {}) + lesson = focus.get("lesson", {}) + + self.ctx = CourseContext( + course_id=course.get("id", ""), + level_id=level.get("id", ""), + unit_id=unit.get("id", ""), + node_id=lesson.get("id", ""), + ) + + self._log(f"courseId = {self.ctx.course_id}") + self._log(f"levelId = {self.ctx.level_id}") + self._log(f"unitId = {self.ctx.unit_id}") + self._log(f"nodeId = {self.ctx.node_id}") + + if not self.ctx.course_id or not self.ctx.node_id: + raise ValueError( + "未能从focus API获取必要的courseId/nodeId。\n" + "可能原因:\n" + " 1. Token已过期(有效期约10小时)\n" + " 2. 当前用户没有进行中的课程\n" + " 3. 需要手动指定 --course-id 和 --node-id" + ) + + return self.ctx + + # -------------------------------------------------------- + # 步骤②: 注册课程会话 (获取lessonId) + # -------------------------------------------------------- + + def step_open_lesson_enrollment(self) -> str: + """POST /study/progress/enrollments/{courseId}/open-lesson → 获取lessonId + + 无需额外认证:courseId即为上下文凭证 + """ + self._log_step("步骤②", f"POST enrollments/{self.ctx.course_id}/open-lesson") + + url = f"{self.studio_api}/study/progress/enrollments/{self.ctx.course_id}/open-lesson" + body = { + "nodeId": self.ctx.node_id, + "instructionsLocale": self.instructions_locale, + "publishTag": "live", + } + headers = self._studio_headers() + + if self.dry_run: + self._log(f"[DRY-RUN] POST {url}") + self._log(f" Body: {json.dumps(body, ensure_ascii=False)}") + self.lesson.lesson_id = "dry-run-lesson-id" + return self.lesson.lesson_id + + resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) + resp.raise_for_status() + data = resp.json() + + if self.verbose: + self._log(f"Response: {json.dumps(data, ensure_ascii=False)[:500]}...") + + self.lesson.lesson_id = data.get("lessonId", "") + + self._log(f"lessonId = {self.lesson.lesson_id}") + + if not self.lesson.lesson_id: + raise ValueError("未能获取lessonId,请检查courseId和nodeId是否正确") + + return self.lesson.lesson_id + + # -------------------------------------------------------- + # 步骤③: 初始化课程 (获取session/tasks) + # -------------------------------------------------------- + + def step_open_lesson_command(self) -> list[ActivityInfo]: + """POST /study/lesson/command (open-lesson) → 获取sessionId/activities/tasks + + 无需额外认证:lessonId即为能力令牌 + """ + self._log_step("步骤③", "POST lesson/command (open-lesson)") + + url = f"{self.studio_api}/study/lesson/command" + body = { + "commandType": "open-lesson", + "commandData": { + "openLesson": { + "lessonId": self.lesson.lesson_id, + "instructionsLocale": self.instructions_locale, + } + }, + "clientState": { + "lastVersion": 0, + "lessonId": self.lesson.lesson_id, + }, + } + headers = self._studio_headers() + + if self.dry_run: + self._log(f"[DRY-RUN] POST {url}") + self._log(f" Body: {json.dumps(body, ensure_ascii=False)[:300]}...") + return [] + + resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) + resp.raise_for_status() + data = resp.json() + + # 检查命令状态 + cmd_status = data.get("commandStatus", {}) + if not cmd_status.get("successful", False): + raise ValueError(f"open-lesson命令失败: {cmd_status.get('errorMessage', 'unknown')}") + + # 提取lastVersion + self.lesson.last_version = data.get("eventHistory", {}).get("lastVersion", 0) + self._log(f"lastVersion = {self.lesson.last_version}") + + # 解析events获取sessionId和activities/tasks + events = data.get("eventHistory", {}).get("events", []) + + for ev in events: + ev_type = ev.get("type", "") + + # 提取sessionId + if ev_type == "student-joined-lesson": + joined = ev.get("data", {}).get("studentJoinedLesson", {}) + self.lesson.session_id = joined.get("sessionId", "") + self._log(f"sessionId = {self.lesson.session_id}") + + # 提取activities和tasks + if ev_type == "activity-sent": + act_data = ev.get("data", {}).get("activitySent", {}) + act = act_data.get("activity", {}) + activity_id = act.get("id", "") + tasks_raw = act.get("tasks", []) + + activity = ActivityInfo(activity_id=activity_id) + + for t in tasks_raw: + task_info = TaskInfo( + activity_id=activity_id, + task_id=t.get("id", ""), + task_type=t.get("taskType", ""), + expected_response=t.get("expectedResponse", {}), + task_detail=t.get("task", {}), + ) + activity.tasks.append(task_info) + + self.activities.append(activity) + + total_tasks = sum(len(a.tasks) for a in self.activities) + self._log(f"共获取 {len(self.activities)} 个Activity, {total_tasks} 个Task") + + for i, act in enumerate(self.activities, 1): + for j, task in enumerate(act.tasks, 1): + self._log( + f" Activity {i}/{len(self.activities)} " + f"Task {j}/{len(act.tasks)}: " + f"type={task.task_type}, " + f"id={task.task_id[:16]}..." + ) + + if not self.lesson.session_id: + raise ValueError("未能从open-lesson响应中提取sessionId") + + return self.activities + + # -------------------------------------------------------- + # 步骤④: 提交任务答案 + # -------------------------------------------------------- + + def step_submit_task(self, task: TaskInfo, time_spent: int = 5) -> SubmitResult: + """POST /study/lesson/command (submit-task-response) → 提交单个任务 + + 关键:lastVersion必须链式递增,每次提交返回新version + """ + url = f"{self.studio_api}/study/lesson/command" + + # 根据taskType构建响应内容 + builder = RESPONSE_BUILDERS.get(task.task_type) + if not builder: + self._log(f" ⚠ 未知任务类型: {task.task_type}, 使用空响应") + contents = {} + else: + contents = builder(task) + + response_body = { + "taskId": task.task_id, + "type": task.task_type, + "contents": contents, + } + + body = { + "commandType": "submit-task-response", + "commandData": { + "submitTaskResponse": { + "lessonId": self.lesson.lesson_id, + "activityId": task.activity_id, + "sessionId": self.lesson.session_id, + "response": response_body, + "timeSpentSecs": time_spent, + } + }, + "clientState": { + "lessonId": self.lesson.lesson_id, + "lastVersion": self.lesson.last_version, + }, + } + + headers = self._studio_headers() + + if self.dry_run: + self._log(f" [DRY-RUN] taskId={task.task_id[:16]}... " + f"type={task.task_type} version={self.lesson.last_version}") + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=True, + new_version=self.lesson.last_version + 1, + ) + + for attempt in range(MAX_RETRIES + 1): + try: + resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) + resp.raise_for_status() + data = resp.json() + + # 更新lastVersion(链式依赖的关键!) + new_version = data.get("eventHistory", {}).get("lastVersion", 0) + cmd_status = data.get("commandStatus", {}) + success = cmd_status.get("successful", False) + + if success: + self.lesson.last_version = new_version + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=True, + new_version=new_version, + ) + else: + error_msg = cmd_status.get("errorMessage", "unknown") + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=False, + error=error_msg, + new_version=self.lesson.last_version, + ) + + except requests.exceptions.RequestException as e: + if attempt < MAX_RETRIES: + self._log(f" ⚠ 请求失败(尝试 {attempt+1}/{MAX_RETRIES+1}): {e}") + time.sleep(1) + continue + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=False, + error=str(e), + ) + + def step_skip_task(self, task: TaskInfo) -> SubmitResult: + """POST /study/lesson/command (skip-task) → 跳过某个任务(降级策略) + + 当submit失败时,尝试skip作为回退 + """ + url = f"{self.studio_api}/study/lesson/command" + + body = { + "commandType": "skip-task", + "commandData": { + "skipTask": { + "lessonId": self.lesson.lesson_id, + "activityId": task.activity_id, + "sessionId": self.lesson.session_id, + "taskId": task.task_id, + } + }, + "clientState": { + "lessonId": self.lesson.lesson_id, + "lastVersion": self.lesson.last_version, + }, + } + + headers = self._studio_headers() + + if self.dry_run: + self._log(f" [DRY-RUN] SKIP taskId={task.task_id[:16]}... version={self.lesson.last_version}") + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=True, + skipped=True, + new_version=self.lesson.last_version + 1, + ) + + try: + resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) + resp.raise_for_status() + data = resp.json() + + new_version = data.get("eventHistory", {}).get("lastVersion", 0) + cmd_status = data.get("commandStatus", {}) + success = cmd_status.get("successful", False) + + if success: + self.lesson.last_version = new_version + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=True, + skipped=True, + new_version=new_version, + ) + else: + error_msg = cmd_status.get("errorMessage", "unknown") + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=False, + error=f"skip-failed: {error_msg}", + ) + except requests.exceptions.RequestException as e: + return SubmitResult( + task_id=task.task_id, + task_type=task.task_type, + success=False, + error=f"skip-exception: {e}", + ) + + # -------------------------------------------------------- + # 主流程 + # -------------------------------------------------------- + + def run(self, course_id: str = "", node_id: str = "") -> bool: + """执行完整的自动完成课程流程""" + print(f"\n{'#'*60}") + print(f" EF Course Autopilot") + print(f" base_url = {self.base_url}") + print(f" studio_api= {self.studio_api}") + print(f" locale = {self.locale} timezone = {self.timezone}") + print(f" delay = {self.delay}s skip_on_fail = {self.skip_on_fail}") + print(f" dry_run = {self.dry_run} verbose = {self.verbose}") + print(f" ssl_verify= {self.verify_ssl}") + print(f" auth = {'X-EF-Access token' if self.token else 'Cookie' if self.cookie else '⚠️ NO AUTH'}") + + if not self.verify_ssl: + print(f" ⚠️ SSL证书验证已禁用") + print(f"{'#'*60}") + + if not self.token and not self.cookie: + print("\n❌ 错误:必须提供 --token 或 --cookie 参数") + return False + + start_time = time.time() + + try: + # 步骤①: 获取课程焦点 + if node_id and course_id: + self.ctx = CourseContext( + course_id=course_id, + node_id=node_id, + ) + self._log_step("步骤①", "使用手动指定的courseId/nodeId (跳过focus API)") + self._log(f"courseId = {self.ctx.course_id}") + self._log(f"nodeId = {self.ctx.node_id}") + else: + self.step_get_focus() + + # 步骤②: 注册课程会话 + self.step_open_lesson_enrollment() + + # 步骤③: 初始化课程 + self.step_open_lesson_command() + + # 步骤④: 逐个提交任务 + total_tasks = sum(len(a.tasks) for a in self.activities) + if total_tasks == 0: + self._log("\n⚠️ 没有发现任何任务,课程可能已完成或结构异常") + return True + + self._log_step("步骤④", f"开始提交任务答案 (共 {total_tasks} 个)") + + completed = 0 + for ai, activity in enumerate(self.activities, 1): + for ti, task in enumerate(activity.tasks, 1): + completed += 1 + task_label = (f"[{completed}/{total_tasks}] " + f"Act{ai}/{len(self.activities)} " + f"Task{ti}/{len(activity.tasks)} " + f"type={task.task_type}") + + # 进度条 + self._log_progress(completed, total_tasks, task.task_type) + + # 模拟真实操作时间 (3~15秒随机) + time_spent = random.randint(3, 15) + + # 提交任务 + result = self.step_submit_task(task, time_spent=time_spent) + + if result.success: + self.results.append(result) + # 详细日志输出到新行 + print() # 换行(进度条后) + self._log(f"✅ {task_label} version → {self.lesson.last_version}") + else: + print() # 换行 + self._log(f"❌ {task_label} 错误: {result.error}") + + if self.skip_on_fail: + # 降级策略:尝试skip + self._log(f" ↳ 尝试跳过任务...") + skip_result = self.step_skip_task(task) + if skip_result.success: + self._log(f" ↳ ⏭️ 已跳过 version → {self.lesson.last_version}") + skip_result.task_type = task.task_type + self.results.append(skip_result) + else: + self._log(f" ↳ ❌ 跳过也失败: {skip_result.error}") + self.results.append(result) + else: + self.results.append(result) + + # 延迟(模拟真实操作节奏) + if self.delay > 0 and not self.dry_run: + time.sleep(self.delay) + + # 汇总报告 + elapsed = time.time() - start_time + self._print_summary(total_tasks, elapsed) + + return all(r.success for r in self.results) + + except requests.exceptions.HTTPError as e: + print(f"\n❌ HTTP错误: {e}") + if e.response is not None: + print(f" 状态码: {e.response.status_code}") + print(f" 响应: {e.response.text[:500]}") + if e.response.status_code == 401: + print(f" 💡 提示:Token可能已过期,请重新获取") + elif e.response.status_code == 403: + print(f" 💡 提示:权限不足,请检查Token是否对应用户") + return False + except json.JSONDecodeError as e: + print(f"\n❌ 响应解析失败: {e}") + return False + except Exception as e: + print(f"\n❌ 执行异常: {e}") + traceback.print_exc() + return False + + def _print_summary(self, total: int, elapsed: float): + """打印执行汇总""" + success_count = sum(1 for r in self.results if r.success and not r.skipped) + skipped_count = sum(1 for r in self.results if r.skipped) + failed_count = sum(1 for r in self.results if not r.success) + + print(f"\n{'#'*60}") + print(f" 执行完成!") + print(f"{'#'*60}") + print(f" 总任务数: {total}") + print(f" ✅ 成功提交: {success_count}") + print(f" ⏭️ 跳过任务: {skipped_count}") + print(f" ❌ 失败任务: {failed_count}") + print(f" ⏱️ 总耗时: {elapsed:.1f}秒") + print(f" 📊 最终version: {self.lesson.last_version}") + + if failed_count > 0: + print(f"\n 失败任务详情:") + for r in self.results: + if not r.success: + print(f" - type={r.task_type} id={r.task_id[:16]}... error={r.error}") + + # 按任务类型统计 + type_stats: dict[str, dict] = {} + for r in self.results: + t = r.task_type + if t not in type_stats: + type_stats[t] = {"total": 0, "success": 0, "failed": 0, "skipped": 0} + type_stats[t]["total"] += 1 + if r.success and not r.skipped: + type_stats[t]["success"] += 1 + elif r.skipped: + type_stats[t]["skipped"] += 1 + else: + type_stats[t]["failed"] += 1 + + if type_stats: + print(f"\n 任务类型统计:") + for t, s in type_stats.items(): + print(f" {t:30s} {s['success']}/{s['total']} 成功" + + (f" {s['skipped']} 跳过" if s['skipped'] else "") + + (f" {s['failed']} 失败" if s['failed'] else "")) + + print(f"{'#'*60}") + + +# ============================================================ +# 命令行入口 +# ============================================================ + +def main(): + parser = argparse.ArgumentParser( + description="EF Course Autopilot — 自动完成EF课程测试脚本", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + # 最简使用:仅提供token,自动获取课程信息并完成 + python3 ef_course_autopilot.py --token "eyJraWQiOi..." + + # 使用Cookie认证(完整Cookie字符串) + python3 ef_course_autopilot.py --cookie "_ga=...; session=..." + + # 指定特定课时 + python3 ef_course_autopilot.py --token "eyJraWQiOi..." --course-id 51c6844b-xxx --node-id 6b530eb9-xxx + + # 失败时自动跳过继续 + python3 ef_course_autopilot.py --token "eyJraWQiOi..." --skip-on-fail + + # 遇到SSL证书错误时禁用验证 + python3 ef_course_autopilot.py --token "eyJraWQiOi..." --verify-ssl + + # 仅预览(不实际发送请求) + python3 ef_course_autopilot.py --token "eyJraWQiOi..." --dry-run + + # 详细模式 + python3 ef_course_autopilot.py --token "eyJraWQiOi..." --verbose + +获取token方法: + 1. 浏览器登录 learn.corporate.ef.com.cn + 2. 打开DevTools → Network + 3. 找到任意 /wl/api/ 请求 + 4. 复制 Request Headers 中的 X-EF-Access 值 + +获取Cookie方法: + 1. 浏览器登录 learn.corporate.ef.com.cn + 2. 打开DevTools → Application → Cookies + 3. 复制完整Cookie字符串 + +参数依赖说明: + --token 仅步骤①(focus)需要,步骤②-④无需认证 + --cookie 可替代--token,需包含完整Cookie字符串 + 最简输入: 仅需 --token 或 --cookie 中的一个 + """, + ) + + # 认证参数(二选一) + auth_group = parser.add_argument_group("认证参数(二选一)") + auth_group.add_argument( + "--token", default="", + help="X-EF-Access JWT token (从浏览器DevTools获取,约10小时有效)", + ) + auth_group.add_argument( + "--cookie", default="", + help="浏览器完整Cookie字符串 (可替代--token)", + ) + + # 目标参数 + target_group = parser.add_argument_group("目标参数(可选,默认自动获取)") + target_group.add_argument( + "--course-id", default="", + help="直接指定courseId (配合--node-id使用, 跳过focus API)", + ) + target_group.add_argument( + "--node-id", default="", + help="直接指定要打开的课时nodeId (配合--course-id使用)", + ) + + # 环境参数 + env_group = parser.add_argument_group("环境参数") + env_group.add_argument( + "--base-url", default=DEFAULT_BASE_URL, + help=f"EF学习平台基础URL (默认: {DEFAULT_BASE_URL})", + ) + env_group.add_argument( + "--studio-api", default=DEFAULT_STUDIO_API, + help=f"Studio API基础URL (默认: {DEFAULT_STUDIO_API})", + ) + env_group.add_argument( + "--locale", default=DEFAULT_LOCALE, + help=f"界面语言 (默认: {DEFAULT_LOCALE})", + ) + env_group.add_argument( + "--timezone", default=DEFAULT_TIMEZONE, + help=f"时区 (默认: {DEFAULT_TIMEZONE})", + ) + + # 行为参数 + behavior_group = parser.add_argument_group("行为参数") + behavior_group.add_argument( + "--delay", type=float, default=DEFAULT_DELAY, + help=f"每次submit之间的延迟秒数 (默认: {DEFAULT_DELAY})", + ) + behavior_group.add_argument( + "--skip-on-fail", action="store_true", + help="提交失败时自动跳过该任务,尝试skip-task继续", + ) + behavior_group.add_argument( + "--verify-ssl", action="store_true", + help="启用SSL证书验证(默认禁用,解决macOS证书问题)", + ) + behavior_group.add_argument( + "--dry-run", action="store_true", + help="仅打印将要执行的请求,不实际发送", + ) + behavior_group.add_argument( + "--verbose", action="store_true", + help="打印详细请求/响应信息", + ) + + args = parser.parse_args() + + # 验证认证参数 + if not args.token and not args.cookie: + parser.error("必须提供 --token 或 --cookie 参数(二选一)") + + if args.course_id and not args.node_id: + parser.error("指定了 --course-id 但缺少 --node-id,两者必须同时指定") + if args.node_id and not args.course_id: + parser.error("指定了 --node-id 但缺少 --course-id,两者必须同时指定") + + autopilot = EFCourseAutopilot( + token=args.token, + cookie=args.cookie, + base_url=args.base_url, + studio_api=args.studio_api, + locale=args.locale, + timezone=args.timezone, + delay=args.delay, + skip_on_fail=args.skip_on_fail, + verify_ssl=args.verify_ssl, + dry_run=args.dry_run, + verbose=args.verbose, + ) + + success = autopilot.run( + course_id=args.course_id, + node_id=args.node_id, + ) + + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/ef_course_loop.py b/ef_course_loop.py new file mode 100755 index 0000000..a1bed47 --- /dev/null +++ b/ef_course_loop.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +""" +EF Course Autopilot — 交互式循环运行脚本 + +从 token.txt 文件读取 JWT token(默认读取运行目录下的 token.txt)。 +token 需从浏览器开发者工具中获取,详见 README.md。 + +用法示例: + python3 ef_course_loop.py # 默认读取 ./token.txt + python3 ef_course_loop.py --token-file /path/to/token.txt +""" + +import argparse +import sys +import time + +from ef_course_autopilot import EFCourseAutopilot + + +def read_token(token_file: str) -> str: + """从指定文件读取 token(仅支持单行内容)""" + try: + with open(token_file, "r") as f: + token = f.read().strip() + if not token: + print(f"❌ token 文件为空: {token_file}") + sys.exit(1) + print(f" 📄 从文件读取 token ({token_file})") + return token + except FileNotFoundError: + print(f"❌ token 文件不存在: {token_file}") + print(f" 请创建 {token_file} 并将 ef_access_token 粘贴到第一行") + sys.exit(1) + except IOError as e: + print(f"❌ 读取文件失败: {e}") + sys.exit(1) + + +def main(): + parser = argparse.ArgumentParser( + description="EF Course Autopilot — 交互式循环运行", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +使用示例: + python3 ef_course_loop.py # 默认读取 ./token.txt + python3 ef_course_loop.py --token-file /path/to/token.txt + +token.txt 说明: + - 仅能包含一行,即 ef_access_token 的值 + - 从浏览器开发者工具中取请求携带的 ef_access_token 内容,手动粘贴 +""", + ) + parser.add_argument( + "--token-file", + default="token.txt", + help="从文件读取 JWT token(默认: ./token.txt)", + ) + parser.add_argument( + "--verify-ssl", + action="store_true", + help="启用SSL证书验证(默认禁用)", + ) + + args = parser.parse_args() + + print("=" * 60) + print(" EF Course Autopilot — 交互式循环运行") + print("=" * 60) + + while True: + # 1. 从文件获取 token + token = read_token(args.token_file) + + # 2. 输入课程数量 + while True: + try: + count_str = input("请输入要完成的课程数量: ").strip() + count = int(count_str) + if count <= 0: + print("❌ 数量必须大于 0") + continue + break + except ValueError: + print("❌ 请输入有效数字") + + # 3. 循环完成课程 + total_success = 0 + total_fail = 0 + start_time = time.time() + + for i in range(count): + print(f"\n{'=' * 60}") + print(f" 第 {i + 1}/{count} 门课程") + print(f"{'=' * 60}") + + autopilot = EFCourseAutopilot( + token=token, + skip_on_fail=True, + verify_ssl=args.verify_ssl, + ) + + try: + success = autopilot.run() + if success: + total_success += 1 + else: + total_fail += 1 + except Exception as e: + print(f"\n❌ 课程执行异常: {e}") + total_fail += 1 + + elapsed = time.time() - start_time + print(f"\n{'#' * 60}") + print(f" 本轮完成!") + print(f" 计划: {count} 门") + print(f" ✅ 成功: {total_success}") + print(f" ❌ 失败: {total_fail}") + print(f" ⏱️ 耗时: {elapsed:.1f}秒") + print(f"{'#' * 60}") + + # 4. 询问是否继续 + again = input("\n是否继续?(y/n): ").strip().lower() + if again != "y": + print("\n程序结束。") + break + + +if __name__ == "__main__": + main() diff --git a/token.txt.example b/token.txt.example new file mode 100644 index 0000000..8d8a37c --- /dev/null +++ b/token.txt.example @@ -0,0 +1 @@ +eyJraWQiOiJ...(将你的 ef_access_token 粘贴在此,仅一行,不要换行)