#!/usr/bin/env python3 """ EF Course Autopilot — 自动完成EF课程流程的测试脚本 基于HAR请求依赖分析,自动完成课程的最简路径为: ① GET /wl/api/study-plan/focus → 获取courseId/nodeId ② POST /study/progress/enrollments/{courseId}/open-lesson → 获取lessonId ③ POST /study/lesson/command (open-lesson) → 获取sessionId/activityId/taskId/lastVersion ④ POST /study/lesson/command (submit-task-response) ×N → 逐个提交任务 认证机制: - 步骤① (learn.corporate域): 需要 X-EF-Access JWT token(约10小时有效期) - 步骤②-④ (api.ef.studio域): 需要 Authorization: Bearer JWT token 最简使用: python3 ef_course_autopilot.py --token "eyJraWQiOi..." 可选参数: --base-url EF学习平台基础URL (默认: https://learn.corporate.ef.com.cn) --studio-api Studio API基础URL (默认: https://api.ef.studio.ef.com.cn) --locale 界面语言 (默认: zh-CN) --timezone 时区 (默认: Asia/Shanghai) --course-id 直接指定courseId (配合--node-id使用, 跳过focus) --node-id 直接指定要打开的课时nodeId (配合--course-id使用) --delay 每次submit之间的延迟秒数 (默认: 1.0) --skip-on-fail 提交失败时自动跳过该任务继续下一个 --dry-run 仅打印将要执行的请求,不实际发送 --verbose 打印详细请求/响应信息 """ import argparse import json import random import sys import time import traceback import uuid from dataclasses import dataclass, field from typing import Any, Optional import requests # ============================================================ # 配置 # ============================================================ DEFAULT_BASE_URL = "https://learn.corporate.ef.com.cn" DEFAULT_STUDIO_API = "https://api.ef.studio.ef.com.cn" DEFAULT_LOCALE = "zh-CN" DEFAULT_TIMEZONE = "Asia/Shanghai" DEFAULT_DELAY = 1.0 # 超时设置(秒) REQUEST_TIMEOUT = 30 # 最大重试次数 MAX_RETRIES = 2 # ============================================================ # 数据类 # ============================================================ @dataclass class CourseContext: """从focus API获取的课程上下文""" course_id: str = "" level_id: str = "" unit_id: str = "" node_id: str = "" # 要打开的课时ID (= lesson.id) @dataclass class LessonSession: """课程会话信息""" lesson_id: str = "" session_id: str = "" last_version: int = 0 @dataclass class TaskInfo: """单个任务的完整信息""" activity_id: str = "" task_id: str = "" task_type: str = "" expected_response: dict = field(default_factory=dict) task_detail: dict = field(default_factory=dict) # task子结构, 包含各类型的具体数据 @dataclass class ActivityInfo: """一个Activity(含多个task)""" activity_id: str = "" tasks: list[TaskInfo] = field(default_factory=list) @dataclass class SubmitResult: """单个任务提交结果""" task_id: str = "" task_type: str = "" success: bool = False skipped: bool = False error: str = "" new_version: int = 0 # ============================================================ # 自动答题策略 # ============================================================ def build_response_media_with_time_markers(task: TaskInfo) -> dict: """media-with-time-markers: 视频标记 — 全部标记为已播放""" markers = task.task_detail.get("mediaWithTimeMarkers", {}) targets = markers.get("targets", []) target_responses = {} for t in targets: tid = t.get("id", "") target_responses[tid] = {"id": tid, "userInput": "played"} return { "mediaWithTimeMarkers": { "videoEnded": True, "targetResponses": target_responses, } } def build_response_language_focus(task: TaskInfo) -> dict: """language-focus: 语言聚焦 — 标记已查看, 全部媒体已播放""" lf = task.task_detail.get("languageFocus", {}) media_items = lf.get("mediaItems", []) media_state = {} for m in media_items: mid = m.get("id", "") media_state[mid] = {"id": mid, "userInput": "played"} return { "languageFocus": { "contentSeen": True, "mediaState": media_state, } } def build_response_sequencing(task: TaskInfo) -> dict: """sequencing: 排序题 — 优先使用expectedResponse中的正确顺序""" expected = task.expected_response.get("contents", {}).get("sequencing", {}) if expected: return {"sequencing": expected} # 回退:按items出现顺序分配0,1,2... seq = task.task_detail.get("sequencing", {}) items = seq.get("items", []) result = {} for i, item in enumerate(items): iid = item.get("id", "") result[iid] = {"id": iid, "userInput": i} return {"sequencing": result} def build_response_gapfill(task: TaskInfo) -> dict: """gapfill: 填空题 — 优先使用expectedResponse中的正确答案""" expected = task.expected_response.get("contents", {}).get("gapfill", {}) if expected: return {"gapfill": expected} # 回退:从task.detail获取gap列表, 填入空字符串 gf = task.task_detail.get("gapfill", {}) gaps = gf.get("gaps", gf.get("items", [])) result = {} for g in gaps: gid = g.get("id", "") result[gid] = {"id": gid, "userInput": ""} return {"gapfill": result} def build_response_matching(task: TaskInfo) -> dict: """matching: 匹配题 — 优先使用expectedResponse中的正确匹配""" expected = task.expected_response.get("contents", {}).get("matching", {}) if expected: return {"matching": expected} # 回退:从task.detail获取items, 空匹配 mf = task.task_detail.get("matching", {}) items = mf.get("items", []) result = {} for item in items: iid = item.get("id", "") result[iid] = {"id": iid, "userInput": ""} return {"matching": result} def build_response_speaking_practice(task: TaskInfo) -> dict: """speaking-practice: 口语练习 — 使用expectedResponse + 低分""" expected = task.expected_response.get("contents", {}).get("speakingPractice", {}) if expected: user_input = expected.get("userInput", {}) if isinstance(user_input, dict): user_input.setdefault("speechScoreSummary", { "score": 0.4, "source": "", "sourceResponse": {}, "wordScores": [], "alternativeInputScores": {}, }) user_input.setdefault("data", []) return {"speakingPractice": expected} # 回退:构造最低限度通过的回答 sp = task.task_detail.get("speakingPractice", {}) target_text = sp.get("targetText", "Hello.") target_locale = sp.get("targetLocale", "en_US") return { "speakingPractice": { "userInput": { "data": [], "id": "", "speechScoreSummary": { "alternativeInputScores": {}, "score": 0.4, "source": "", "sourceResponse": {}, "wordScores": [], }, "targetLocale": target_locale, "targetText": target_text, } } } def build_response_multiple_choice(task: TaskInfo) -> dict: """multiple-choice: 选择题 — 优先使用expectedResponse中的正确选项""" expected = task.expected_response.get("contents", {}).get("multipleChoice", {}) if expected: return {"multipleChoice": expected} # 回退:从task.detail获取options, 随机选一个 mc = task.task_detail.get("multipleChoice", {}) contents = mc.get("contents", {}) options = mc.get("options", list(contents.keys()) if contents else []) result = {} selected = False for oid in (options if isinstance(options, list) else [options]): if isinstance(oid, dict): oid = oid.get("id", "") result[oid] = {"id": oid, "userInput": "selected" if not selected else "not-selected"} selected = True return {"multipleChoice": {"contents": result}} def build_response_text_highlights(task: TaskInfo) -> dict: """text-highlights: 文本高亮 — 标记所有内容已查看""" expected = task.expected_response.get("contents", {}).get("textHighlights", {}) if expected: return {"textHighlights": expected} th = task.task_detail.get("textHighlights", {}) highlights = th.get("highlights", th.get("items", [])) result = {} for h in highlights: hid = h.get("id", "") result[hid] = {"id": hid, "userInput": "viewed"} return {"textHighlights": result} def build_response_flashcards(task: TaskInfo) -> dict: """flashcards: 闪卡 — 优先使用expectedResponse,否则标记所有卡片已翻转、媒体已播放""" expected = task.expected_response.get("contents", {}).get("flashcards", {}) if expected: return {"flashcards": expected} # 回退:从task.detail获取卡片信息,标记全部已翻转/播放 fc = task.task_detail.get("flashcards", {}).get("flashcards", []) cards_state = {} media_state = {} for card in fc: cid = card.get("id", "") if cid: cards_state[cid] = {"id": cid, "userInput": "flipped"} audio_id = card.get("audio", {}).get("id", "") if audio_id: media_state[audio_id] = {"id": audio_id, "userInput": "played"} image_id = card.get("image", {}).get("id", "") if image_id: media_state[image_id] = {"id": image_id, "userInput": "played"} return { "flashcards": { "contentSeen": True, "cardsState": cards_state, "mediaState": media_state, } } def build_response_categorisation(task: TaskInfo) -> dict: """categorisation: 分类题 — 优先使用expectedResponse,否则提交空映射""" expected = task.expected_response.get("contents", {}).get("categorisation", {}) if expected: return {"categorisation": expected} # 回退:从task.detail获取items,全部设为未分类 cat = task.task_detail.get("categorisation", {}) items = cat.get("items", []) result = {} for item in items: iid = item.get("id", "") result[iid] = {"id": iid, "userInput": ""} return {"categorisation": result} # 任务类型 → 答题策略映射 RESPONSE_BUILDERS = { "media-with-time-markers": build_response_media_with_time_markers, "language-focus": build_response_language_focus, "sequencing": build_response_sequencing, "gapfill": build_response_gapfill, "matching": build_response_matching, "flashcards": build_response_flashcards, "speaking-practice": build_response_speaking_practice, "multiple-choice": build_response_multiple_choice, "text-highlights": build_response_text_highlights, "categorisation": build_response_categorisation, } # ============================================================ # 核心客户端 # ============================================================ class EFCourseAutopilot: """EF课程自动完成客户端 认证机制: - 步骤① (learn.corporate域): 需要 X-EF-Access JWT token - 步骤②-④ (api.ef.studio域): 需要 Authorization: Bearer JWT token """ def __init__( self, token: str = "", cookie: str = "", base_url: str = DEFAULT_BASE_URL, studio_api: str = DEFAULT_STUDIO_API, locale: str = DEFAULT_LOCALE, timezone: str = DEFAULT_TIMEZONE, delay: float = DEFAULT_DELAY, skip_on_fail: bool = False, verify_ssl: bool = False, dry_run: bool = False, verbose: bool = False, ): self.token = token self.cookie = cookie self.base_url = base_url.rstrip("/") self.studio_api = studio_api.rstrip("/") self.locale = locale self.timezone = timezone self.delay = delay self.skip_on_fail = skip_on_fail self.verify_ssl = verify_ssl self.dry_run = dry_run self.verbose = verbose # 根据locale推断instructionsLocale (zh-CN → zh_CN) self.instructions_locale = locale.replace("-", "_") if self.instructions_locale not in ("zh_CN", "en_US", "ja_JP", "ko_KR", "de_DE", "fr_FR", "es_ES", "pt_BR"): self.instructions_locale = "en_US" self.session = requests.Session() self._setup_session() # 状态 self.ctx = CourseContext() self.lesson = LessonSession() self.activities: list[ActivityInfo] = [] self.results: list[SubmitResult] = [] def _setup_session(self): """配置HTTP会话的通用headers和SSL设置""" self.session.headers.update({ "Accept": "application/json, text/plain, */*", "Content-Type": "application/json", "User-Agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/148.0.0.0 Safari/537.36" ), "Cache-Control": "no-cache", "Pragma": "no-cache", }) # SSL证书验证 if not self.verify_ssl: self.session.verify = False # 禁用InsecureRequestWarning警告 import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def _learn_headers(self) -> dict: """learn.corporate域名的请求headers (需要X-EF-Access或Cookie) 认证优先级: 1. X-EF-Access token (JWT, ~10小时有效) 2. Cookie (浏览器完整Cookie字符串) """ headers = { "Referer": f"{self.base_url}/wl/study-plan?product=b2b", "x-ef-correlation-id": self._correlation_id(), } if self.token: headers["X-EF-Access"] = self.token if self.cookie: headers["Cookie"] = self.cookie return headers def _studio_headers(self) -> dict: """api.ef.studio域名的请求headers 需要 Authorization: Bearer JWT token。 """ headers = { "Origin": "https://lesson-player.ef.studio.ef.com.cn", "Referer": "https://lesson-player.ef.studio.ef.com.cn/", "x-ef-correlation-id": self._correlation_id(), } if self.token: headers["Authorization"] = f"Bearer {self.token}" return headers def _correlation_id(self) -> str: """生成x-ef-correlation-id (UUID格式)""" return str(uuid.uuid4()) def _log(self, msg: str): """普通日志""" print(f" {msg}") def _log_step(self, step: str, msg: str): """步骤日志""" print(f"\n{'='*60}") print(f" {step}: {msg}") print(f"{'='*60}") def _log_progress(self, current: int, total: int, label: str): """进度日志""" pct = current / total * 100 if total > 0 else 0 bar_len = 30 filled = int(bar_len * current / total) if total > 0 else 0 bar = "█" * filled + "░" * (bar_len - filled) print(f"\r [{bar}] {pct:.0f}% ({current}/{total}) {label}", end="", flush=True) # -------------------------------------------------------- # 步骤①: 获取课程焦点 # -------------------------------------------------------- def step_get_focus(self) -> CourseContext: """GET /wl/api/study-plan/focus → 获取courseId/levelId/unitId/nodeId 需要认证:X-EF-Access token 或 Cookie """ self._log_step("步骤①", "GET /wl/api/study-plan/focus") url = f"{self.base_url}/wl/api/study-plan/focus" params = {"locale": self.locale} headers = self._learn_headers() if self.dry_run: self._log(f"[DRY-RUN] GET {url} params={params}") self._log(f"[DRY-RUN] Auth: {'X-EF-Access' if self.token else 'Cookie' if self.cookie else 'NONE'}") return self.ctx resp = self.session.get(url, params=params, headers=headers, timeout=REQUEST_TIMEOUT) resp.raise_for_status() data = resp.json() if self.verbose: self._log(f"Response: {json.dumps(data, ensure_ascii=False)[:800]}...") # 解析focus数据 — 兼容多种响应结构 focus = data.get("focus", data) course = focus.get("course", {}) level = focus.get("level", {}) unit = focus.get("unit", {}) lesson = focus.get("lesson", {}) self.ctx = CourseContext( course_id=course.get("id", ""), level_id=level.get("id", ""), unit_id=unit.get("id", ""), node_id=lesson.get("id", ""), ) self._log(f"courseId = {self.ctx.course_id}") self._log(f"levelId = {self.ctx.level_id}") self._log(f"unitId = {self.ctx.unit_id}") self._log(f"nodeId = {self.ctx.node_id}") if not self.ctx.course_id or not self.ctx.node_id: raise ValueError( "未能从focus API获取必要的courseId/nodeId。\n" "可能原因:\n" " 1. Token已过期(有效期约10小时)\n" " 2. 当前用户没有进行中的课程\n" " 3. 需要手动指定 --course-id 和 --node-id" ) return self.ctx # -------------------------------------------------------- # 步骤②: 注册课程会话 (获取lessonId) # -------------------------------------------------------- def step_open_lesson_enrollment(self) -> str: """POST /study/progress/enrollments/{courseId}/open-lesson → 获取lessonId 无需额外认证:courseId即为上下文凭证 """ self._log_step("步骤②", f"POST enrollments/{self.ctx.course_id}/open-lesson") url = f"{self.studio_api}/study/progress/enrollments/{self.ctx.course_id}/open-lesson" body = { "nodeId": self.ctx.node_id, "instructionsLocale": self.instructions_locale, "publishTag": "live", } headers = self._studio_headers() if self.dry_run: self._log(f"[DRY-RUN] POST {url}") self._log(f" Body: {json.dumps(body, ensure_ascii=False)}") self.lesson.lesson_id = "dry-run-lesson-id" return self.lesson.lesson_id resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) resp.raise_for_status() data = resp.json() if self.verbose: self._log(f"Response: {json.dumps(data, ensure_ascii=False)[:500]}...") self.lesson.lesson_id = data.get("lessonId", "") self._log(f"lessonId = {self.lesson.lesson_id}") if not self.lesson.lesson_id: raise ValueError("未能获取lessonId,请检查courseId和nodeId是否正确") return self.lesson.lesson_id # -------------------------------------------------------- # 步骤③: 初始化课程 (获取session/tasks) # -------------------------------------------------------- def step_open_lesson_command(self) -> list[ActivityInfo]: """POST /study/lesson/command (open-lesson) → 获取sessionId/activities/tasks 无需额外认证:lessonId即为能力令牌 """ self._log_step("步骤③", "POST lesson/command (open-lesson)") url = f"{self.studio_api}/study/lesson/command" body = { "commandType": "open-lesson", "commandData": { "openLesson": { "lessonId": self.lesson.lesson_id, "instructionsLocale": self.instructions_locale, } }, "clientState": { "lastVersion": 0, "lessonId": self.lesson.lesson_id, }, } headers = self._studio_headers() if self.dry_run: self._log(f"[DRY-RUN] POST {url}") self._log(f" Body: {json.dumps(body, ensure_ascii=False)[:300]}...") return [] resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) resp.raise_for_status() data = resp.json() # 检查命令状态 cmd_status = data.get("commandStatus", {}) if not cmd_status.get("successful", False): raise ValueError(f"open-lesson命令失败: {cmd_status.get('errorMessage', 'unknown')}") # 提取lastVersion self.lesson.last_version = data.get("eventHistory", {}).get("lastVersion", 0) self._log(f"lastVersion = {self.lesson.last_version}") # 解析events获取sessionId和activities/tasks events = data.get("eventHistory", {}).get("events", []) for ev in events: ev_type = ev.get("type", "") # 提取sessionId if ev_type == "student-joined-lesson": joined = ev.get("data", {}).get("studentJoinedLesson", {}) self.lesson.session_id = joined.get("sessionId", "") self._log(f"sessionId = {self.lesson.session_id}") # 提取activities和tasks if ev_type == "activity-sent": act_data = ev.get("data", {}).get("activitySent", {}) act = act_data.get("activity", {}) activity_id = act.get("id", "") tasks_raw = act.get("tasks", []) activity = ActivityInfo(activity_id=activity_id) for t in tasks_raw: task_info = TaskInfo( activity_id=activity_id, task_id=t.get("id", ""), task_type=t.get("taskType", ""), expected_response=t.get("expectedResponse", {}), task_detail=t.get("task", {}), ) activity.tasks.append(task_info) self.activities.append(activity) total_tasks = sum(len(a.tasks) for a in self.activities) self._log(f"共获取 {len(self.activities)} 个Activity, {total_tasks} 个Task") for i, act in enumerate(self.activities, 1): for j, task in enumerate(act.tasks, 1): self._log( f" Activity {i}/{len(self.activities)} " f"Task {j}/{len(act.tasks)}: " f"type={task.task_type}, " f"id={task.task_id[:16]}..." ) if not self.lesson.session_id: raise ValueError("未能从open-lesson响应中提取sessionId") return self.activities # -------------------------------------------------------- # 步骤④: 提交任务答案 # -------------------------------------------------------- def step_submit_task(self, task: TaskInfo, time_spent: int = 5) -> SubmitResult: """POST /study/lesson/command (submit-task-response) → 提交单个任务 关键:lastVersion必须链式递增,每次提交返回新version """ url = f"{self.studio_api}/study/lesson/command" # 根据taskType构建响应内容 builder = RESPONSE_BUILDERS.get(task.task_type) if not builder: self._log(f" ⚠ 未知任务类型: {task.task_type}, 使用空响应") contents = {} else: contents = builder(task) response_body = { "taskId": task.task_id, "type": task.task_type, "contents": contents, } body = { "commandType": "submit-task-response", "commandData": { "submitTaskResponse": { "lessonId": self.lesson.lesson_id, "activityId": task.activity_id, "sessionId": self.lesson.session_id, "response": response_body, "timeSpentSecs": time_spent, } }, "clientState": { "lessonId": self.lesson.lesson_id, "lastVersion": self.lesson.last_version, }, } headers = self._studio_headers() if self.dry_run: self._log(f" [DRY-RUN] taskId={task.task_id[:16]}... " f"type={task.task_type} version={self.lesson.last_version}") return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=True, new_version=self.lesson.last_version + 1, ) for attempt in range(MAX_RETRIES + 1): try: resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) resp.raise_for_status() data = resp.json() # 更新lastVersion(链式依赖的关键!) new_version = data.get("eventHistory", {}).get("lastVersion", 0) cmd_status = data.get("commandStatus", {}) success = cmd_status.get("successful", False) if success: self.lesson.last_version = new_version return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=True, new_version=new_version, ) else: error_msg = cmd_status.get("errorMessage", "unknown") return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=False, error=error_msg, new_version=self.lesson.last_version, ) except requests.exceptions.RequestException as e: if attempt < MAX_RETRIES: self._log(f" ⚠ 请求失败(尝试 {attempt+1}/{MAX_RETRIES+1}): {e}") time.sleep(1) continue return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=False, error=str(e), ) def step_skip_task(self, task: TaskInfo) -> SubmitResult: """POST /study/lesson/command (skip-task) → 跳过某个任务(降级策略) 当submit失败时,尝试skip作为回退 """ url = f"{self.studio_api}/study/lesson/command" body = { "commandType": "skip-task", "commandData": { "skipTask": { "lessonId": self.lesson.lesson_id, "activityId": task.activity_id, "sessionId": self.lesson.session_id, "taskId": task.task_id, } }, "clientState": { "lessonId": self.lesson.lesson_id, "lastVersion": self.lesson.last_version, }, } headers = self._studio_headers() if self.dry_run: self._log(f" [DRY-RUN] SKIP taskId={task.task_id[:16]}... version={self.lesson.last_version}") return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=True, skipped=True, new_version=self.lesson.last_version + 1, ) try: resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT) resp.raise_for_status() data = resp.json() new_version = data.get("eventHistory", {}).get("lastVersion", 0) cmd_status = data.get("commandStatus", {}) success = cmd_status.get("successful", False) if success: self.lesson.last_version = new_version return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=True, skipped=True, new_version=new_version, ) else: error_msg = cmd_status.get("errorMessage", "unknown") return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=False, error=f"skip-failed: {error_msg}", ) except requests.exceptions.RequestException as e: return SubmitResult( task_id=task.task_id, task_type=task.task_type, success=False, error=f"skip-exception: {e}", ) # -------------------------------------------------------- # 主流程 # -------------------------------------------------------- def run(self, course_id: str = "", node_id: str = "") -> bool: """执行完整的自动完成课程流程""" print(f"\n{'#'*60}") print(f" EF Course Autopilot") print(f" base_url = {self.base_url}") print(f" studio_api= {self.studio_api}") print(f" locale = {self.locale} timezone = {self.timezone}") print(f" delay = {self.delay}s skip_on_fail = {self.skip_on_fail}") print(f" dry_run = {self.dry_run} verbose = {self.verbose}") print(f" ssl_verify= {self.verify_ssl}") print(f" auth = {'X-EF-Access token' if self.token else 'Cookie' if self.cookie else '⚠️ NO AUTH'}") if not self.verify_ssl: print(f" ⚠️ SSL证书验证已禁用") print(f"{'#'*60}") if not self.token and not self.cookie: print("\n❌ 错误:必须提供 --token 或 --cookie 参数") return False start_time = time.time() try: # 步骤①: 获取课程焦点 if node_id and course_id: self.ctx = CourseContext( course_id=course_id, node_id=node_id, ) self._log_step("步骤①", "使用手动指定的courseId/nodeId (跳过focus API)") self._log(f"courseId = {self.ctx.course_id}") self._log(f"nodeId = {self.ctx.node_id}") else: self.step_get_focus() # 步骤②: 注册课程会话 self.step_open_lesson_enrollment() # 步骤③: 初始化课程 self.step_open_lesson_command() # 步骤④: 逐个提交任务 total_tasks = sum(len(a.tasks) for a in self.activities) if total_tasks == 0: self._log("\n⚠️ 没有发现任何任务,课程可能已完成或结构异常") return True self._log_step("步骤④", f"开始提交任务答案 (共 {total_tasks} 个)") completed = 0 for ai, activity in enumerate(self.activities, 1): for ti, task in enumerate(activity.tasks, 1): completed += 1 task_label = (f"[{completed}/{total_tasks}] " f"Act{ai}/{len(self.activities)} " f"Task{ti}/{len(activity.tasks)} " f"type={task.task_type}") # 进度条 self._log_progress(completed, total_tasks, task.task_type) # 模拟真实操作时间 (3~15秒随机) time_spent = random.randint(3, 15) # 提交任务 result = self.step_submit_task(task, time_spent=time_spent) if result.success: self.results.append(result) # 详细日志输出到新行 print() # 换行(进度条后) self._log(f"✅ {task_label} version → {self.lesson.last_version}") else: print() # 换行 self._log(f"❌ {task_label} 错误: {result.error}") if self.skip_on_fail: # 降级策略:尝试skip self._log(f" ↳ 尝试跳过任务...") skip_result = self.step_skip_task(task) if skip_result.success: self._log(f" ↳ ⏭️ 已跳过 version → {self.lesson.last_version}") skip_result.task_type = task.task_type self.results.append(skip_result) else: self._log(f" ↳ ❌ 跳过也失败: {skip_result.error}") self.results.append(result) else: self.results.append(result) # 延迟(模拟真实操作节奏) if self.delay > 0 and not self.dry_run: time.sleep(self.delay) # 汇总报告 elapsed = time.time() - start_time self._print_summary(total_tasks, elapsed) return all(r.success for r in self.results) except requests.exceptions.HTTPError as e: print(f"\n❌ HTTP错误: {e}") if e.response is not None: print(f" 状态码: {e.response.status_code}") print(f" 响应: {e.response.text[:500]}") if e.response.status_code == 401: print(f" 💡 提示:Token可能已过期,请重新获取") elif e.response.status_code == 403: print(f" 💡 提示:权限不足,请检查Token是否对应用户") return False except json.JSONDecodeError as e: print(f"\n❌ 响应解析失败: {e}") return False except Exception as e: print(f"\n❌ 执行异常: {e}") traceback.print_exc() return False def _print_summary(self, total: int, elapsed: float): """打印执行汇总""" success_count = sum(1 for r in self.results if r.success and not r.skipped) skipped_count = sum(1 for r in self.results if r.skipped) failed_count = sum(1 for r in self.results if not r.success) print(f"\n{'#'*60}") print(f" 执行完成!") print(f"{'#'*60}") print(f" 总任务数: {total}") print(f" ✅ 成功提交: {success_count}") print(f" ⏭️ 跳过任务: {skipped_count}") print(f" ❌ 失败任务: {failed_count}") print(f" ⏱️ 总耗时: {elapsed:.1f}秒") print(f" 📊 最终version: {self.lesson.last_version}") if failed_count > 0: print(f"\n 失败任务详情:") for r in self.results: if not r.success: print(f" - type={r.task_type} id={r.task_id[:16]}... error={r.error}") # 按任务类型统计 type_stats: dict[str, dict] = {} for r in self.results: t = r.task_type if t not in type_stats: type_stats[t] = {"total": 0, "success": 0, "failed": 0, "skipped": 0} type_stats[t]["total"] += 1 if r.success and not r.skipped: type_stats[t]["success"] += 1 elif r.skipped: type_stats[t]["skipped"] += 1 else: type_stats[t]["failed"] += 1 if type_stats: print(f"\n 任务类型统计:") for t, s in type_stats.items(): print(f" {t:30s} {s['success']}/{s['total']} 成功" + (f" {s['skipped']} 跳过" if s['skipped'] else "") + (f" {s['failed']} 失败" if s['failed'] else "")) print(f"{'#'*60}") # ============================================================ # 命令行入口 # ============================================================ def main(): parser = argparse.ArgumentParser( description="EF Course Autopilot — 自动完成EF课程测试脚本", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 最简使用:仅提供token,自动获取课程信息并完成 python3 ef_course_autopilot.py --token "eyJraWQiOi..." # 使用Cookie认证(完整Cookie字符串) python3 ef_course_autopilot.py --cookie "_ga=...; session=..." # 指定特定课时 python3 ef_course_autopilot.py --token "eyJraWQiOi..." --course-id 51c6844b-xxx --node-id 6b530eb9-xxx # 失败时自动跳过继续 python3 ef_course_autopilot.py --token "eyJraWQiOi..." --skip-on-fail # 遇到SSL证书错误时禁用验证 python3 ef_course_autopilot.py --token "eyJraWQiOi..." --verify-ssl # 仅预览(不实际发送请求) python3 ef_course_autopilot.py --token "eyJraWQiOi..." --dry-run # 详细模式 python3 ef_course_autopilot.py --token "eyJraWQiOi..." --verbose 获取token方法: 1. 浏览器登录 learn.corporate.ef.com.cn 2. 打开DevTools → Network 3. 找到任意 /wl/api/ 请求 4. 复制 Request Headers 中的 X-EF-Access 值 获取Cookie方法: 1. 浏览器登录 learn.corporate.ef.com.cn 2. 打开DevTools → Application → Cookies 3. 复制完整Cookie字符串 参数依赖说明: --token 仅步骤①(focus)需要,步骤②-④无需认证 --cookie 可替代--token,需包含完整Cookie字符串 最简输入: 仅需 --token 或 --cookie 中的一个 """, ) # 认证参数(二选一) auth_group = parser.add_argument_group("认证参数(二选一)") auth_group.add_argument( "--token", default="", help="X-EF-Access JWT token (从浏览器DevTools获取,约10小时有效)", ) auth_group.add_argument( "--cookie", default="", help="浏览器完整Cookie字符串 (可替代--token)", ) # 目标参数 target_group = parser.add_argument_group("目标参数(可选,默认自动获取)") target_group.add_argument( "--course-id", default="", help="直接指定courseId (配合--node-id使用, 跳过focus API)", ) target_group.add_argument( "--node-id", default="", help="直接指定要打开的课时nodeId (配合--course-id使用)", ) # 环境参数 env_group = parser.add_argument_group("环境参数") env_group.add_argument( "--base-url", default=DEFAULT_BASE_URL, help=f"EF学习平台基础URL (默认: {DEFAULT_BASE_URL})", ) env_group.add_argument( "--studio-api", default=DEFAULT_STUDIO_API, help=f"Studio API基础URL (默认: {DEFAULT_STUDIO_API})", ) env_group.add_argument( "--locale", default=DEFAULT_LOCALE, help=f"界面语言 (默认: {DEFAULT_LOCALE})", ) env_group.add_argument( "--timezone", default=DEFAULT_TIMEZONE, help=f"时区 (默认: {DEFAULT_TIMEZONE})", ) # 行为参数 behavior_group = parser.add_argument_group("行为参数") behavior_group.add_argument( "--delay", type=float, default=DEFAULT_DELAY, help=f"每次submit之间的延迟秒数 (默认: {DEFAULT_DELAY})", ) behavior_group.add_argument( "--skip-on-fail", action="store_true", help="提交失败时自动跳过该任务,尝试skip-task继续", ) behavior_group.add_argument( "--verify-ssl", action="store_true", help="启用SSL证书验证(默认禁用,解决macOS证书问题)", ) behavior_group.add_argument( "--dry-run", action="store_true", help="仅打印将要执行的请求,不实际发送", ) behavior_group.add_argument( "--verbose", action="store_true", help="打印详细请求/响应信息", ) args = parser.parse_args() # 验证认证参数 if not args.token and not args.cookie: parser.error("必须提供 --token 或 --cookie 参数(二选一)") if args.course_id and not args.node_id: parser.error("指定了 --course-id 但缺少 --node-id,两者必须同时指定") if args.node_id and not args.course_id: parser.error("指定了 --node-id 但缺少 --course-id,两者必须同时指定") autopilot = EFCourseAutopilot( token=args.token, cookie=args.cookie, base_url=args.base_url, studio_api=args.studio_api, locale=args.locale, timezone=args.timezone, delay=args.delay, skip_on_fail=args.skip_on_fail, verify_ssl=args.verify_ssl, dry_run=args.dry_run, verbose=args.verbose, ) success = autopilot.run( course_id=args.course_id, node_id=args.node_id, ) sys.exit(0 if success else 1) if __name__ == "__main__": main()