Files
EFAuto/ef_course_autopilot.py
cat-shark 14433589ac 新增 categorisation 任务类型支持
HAR 文件分析发现新的任务类型 categorisation(分类题),
新增对应的答题策略函数并注册到 RESPONSE_BUILDERS 字典中。
优先使用服务器返回的 expectedResponse,回退时提交空映射。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 22:51:51 +08:00

1129 lines
41 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
EF Course Autopilot — 自动完成EF课程流程的测试脚本
基于HAR请求依赖分析,自动完成课程的最简路径为:
① GET /wl/api/study-plan/focus → 获取courseId/nodeId
② POST /study/progress/enrollments/{courseId}/open-lesson → 获取lessonId
③ POST /study/lesson/command (open-lesson) → 获取sessionId/activityId/taskId/lastVersion
④ POST /study/lesson/command (submit-task-response) ×N → 逐个提交任务
认证机制:
- 步骤① (learn.corporate域): 需要 X-EF-Access JWT token(约10小时有效期)
- 步骤②-④ (api.ef.studio域): 需要 Authorization: Bearer JWT token
最简使用:
python3 ef_course_autopilot.py --token "eyJraWQiOi..."
可选参数:
--base-url EF学习平台基础URL (默认: https://learn.corporate.ef.com.cn)
--studio-api Studio API基础URL (默认: https://api.ef.studio.ef.com.cn)
--locale 界面语言 (默认: zh-CN)
--timezone 时区 (默认: Asia/Shanghai)
--course-id 直接指定courseId (配合--node-id使用, 跳过focus)
--node-id 直接指定要打开的课时nodeId (配合--course-id使用)
--delay 每次submit之间的延迟秒数 (默认: 1.0)
--skip-on-fail 提交失败时自动跳过该任务继续下一个
--dry-run 仅打印将要执行的请求,不实际发送
--verbose 打印详细请求/响应信息
"""
import argparse
import json
import random
import sys
import time
import traceback
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional
import requests
# ============================================================
# 配置
# ============================================================
DEFAULT_BASE_URL = "https://learn.corporate.ef.com.cn"
DEFAULT_STUDIO_API = "https://api.ef.studio.ef.com.cn"
DEFAULT_LOCALE = "zh-CN"
DEFAULT_TIMEZONE = "Asia/Shanghai"
DEFAULT_DELAY = 1.0
# 超时设置(秒)
REQUEST_TIMEOUT = 30
# 最大重试次数
MAX_RETRIES = 2
# ============================================================
# 数据类
# ============================================================
@dataclass
class CourseContext:
"""从focus API获取的课程上下文"""
course_id: str = ""
level_id: str = ""
unit_id: str = ""
node_id: str = "" # 要打开的课时ID (= lesson.id)
@dataclass
class LessonSession:
"""课程会话信息"""
lesson_id: str = ""
session_id: str = ""
last_version: int = 0
@dataclass
class TaskInfo:
"""单个任务的完整信息"""
activity_id: str = ""
task_id: str = ""
task_type: str = ""
expected_response: dict = field(default_factory=dict)
task_detail: dict = field(default_factory=dict) # task子结构, 包含各类型的具体数据
@dataclass
class ActivityInfo:
"""一个Activity(含多个task"""
activity_id: str = ""
tasks: list[TaskInfo] = field(default_factory=list)
@dataclass
class SubmitResult:
"""单个任务提交结果"""
task_id: str = ""
task_type: str = ""
success: bool = False
skipped: bool = False
error: str = ""
new_version: int = 0
# ============================================================
# 自动答题策略
# ============================================================
def build_response_media_with_time_markers(task: TaskInfo) -> dict:
"""media-with-time-markers: 视频标记 — 全部标记为已播放"""
markers = task.task_detail.get("mediaWithTimeMarkers", {})
targets = markers.get("targets", [])
target_responses = {}
for t in targets:
tid = t.get("id", "")
target_responses[tid] = {"id": tid, "userInput": "played"}
return {
"mediaWithTimeMarkers": {
"videoEnded": True,
"targetResponses": target_responses,
}
}
def build_response_language_focus(task: TaskInfo) -> dict:
"""language-focus: 语言聚焦 — 标记已查看, 全部媒体已播放"""
lf = task.task_detail.get("languageFocus", {})
media_items = lf.get("mediaItems", [])
media_state = {}
for m in media_items:
mid = m.get("id", "")
media_state[mid] = {"id": mid, "userInput": "played"}
return {
"languageFocus": {
"contentSeen": True,
"mediaState": media_state,
}
}
def build_response_sequencing(task: TaskInfo) -> dict:
"""sequencing: 排序题 — 优先使用expectedResponse中的正确顺序"""
expected = task.expected_response.get("contents", {}).get("sequencing", {})
if expected:
return {"sequencing": expected}
# 回退:按items出现顺序分配0,1,2...
seq = task.task_detail.get("sequencing", {})
items = seq.get("items", [])
result = {}
for i, item in enumerate(items):
iid = item.get("id", "")
result[iid] = {"id": iid, "userInput": i}
return {"sequencing": result}
def build_response_gapfill(task: TaskInfo) -> dict:
"""gapfill: 填空题 — 优先使用expectedResponse中的正确答案"""
expected = task.expected_response.get("contents", {}).get("gapfill", {})
if expected:
return {"gapfill": expected}
# 回退:从task.detail获取gap列表, 填入空字符串
gf = task.task_detail.get("gapfill", {})
gaps = gf.get("gaps", gf.get("items", []))
result = {}
for g in gaps:
gid = g.get("id", "")
result[gid] = {"id": gid, "userInput": ""}
return {"gapfill": result}
def build_response_matching(task: TaskInfo) -> dict:
"""matching: 匹配题 — 优先使用expectedResponse中的正确匹配"""
expected = task.expected_response.get("contents", {}).get("matching", {})
if expected:
return {"matching": expected}
# 回退:从task.detail获取items, 空匹配
mf = task.task_detail.get("matching", {})
items = mf.get("items", [])
result = {}
for item in items:
iid = item.get("id", "")
result[iid] = {"id": iid, "userInput": ""}
return {"matching": result}
def build_response_speaking_practice(task: TaskInfo) -> dict:
"""speaking-practice: 口语练习 — 使用expectedResponse + 低分"""
expected = task.expected_response.get("contents", {}).get("speakingPractice", {})
if expected:
user_input = expected.get("userInput", {})
if isinstance(user_input, dict):
user_input.setdefault("speechScoreSummary", {
"score": 0.4,
"source": "",
"sourceResponse": {},
"wordScores": [],
"alternativeInputScores": {},
})
user_input.setdefault("data", [])
return {"speakingPractice": expected}
# 回退:构造最低限度通过的回答
sp = task.task_detail.get("speakingPractice", {})
target_text = sp.get("targetText", "Hello.")
target_locale = sp.get("targetLocale", "en_US")
return {
"speakingPractice": {
"userInput": {
"data": [],
"id": "",
"speechScoreSummary": {
"alternativeInputScores": {},
"score": 0.4,
"source": "",
"sourceResponse": {},
"wordScores": [],
},
"targetLocale": target_locale,
"targetText": target_text,
}
}
}
def build_response_multiple_choice(task: TaskInfo) -> dict:
"""multiple-choice: 选择题 — 优先使用expectedResponse中的正确选项"""
expected = task.expected_response.get("contents", {}).get("multipleChoice", {})
if expected:
return {"multipleChoice": expected}
# 回退:从task.detail获取options, 随机选一个
mc = task.task_detail.get("multipleChoice", {})
contents = mc.get("contents", {})
options = mc.get("options", list(contents.keys()) if contents else [])
result = {}
selected = False
for oid in (options if isinstance(options, list) else [options]):
if isinstance(oid, dict):
oid = oid.get("id", "")
result[oid] = {"id": oid, "userInput": "selected" if not selected else "not-selected"}
selected = True
return {"multipleChoice": {"contents": result}}
def build_response_text_highlights(task: TaskInfo) -> dict:
"""text-highlights: 文本高亮 — 标记所有内容已查看"""
expected = task.expected_response.get("contents", {}).get("textHighlights", {})
if expected:
return {"textHighlights": expected}
th = task.task_detail.get("textHighlights", {})
highlights = th.get("highlights", th.get("items", []))
result = {}
for h in highlights:
hid = h.get("id", "")
result[hid] = {"id": hid, "userInput": "viewed"}
return {"textHighlights": result}
def build_response_flashcards(task: TaskInfo) -> dict:
"""flashcards: 闪卡 — 优先使用expectedResponse,否则标记所有卡片已翻转、媒体已播放"""
expected = task.expected_response.get("contents", {}).get("flashcards", {})
if expected:
return {"flashcards": expected}
# 回退:从task.detail获取卡片信息,标记全部已翻转/播放
fc = task.task_detail.get("flashcards", {}).get("flashcards", [])
cards_state = {}
media_state = {}
for card in fc:
cid = card.get("id", "")
if cid:
cards_state[cid] = {"id": cid, "userInput": "flipped"}
audio_id = card.get("audio", {}).get("id", "")
if audio_id:
media_state[audio_id] = {"id": audio_id, "userInput": "played"}
image_id = card.get("image", {}).get("id", "")
if image_id:
media_state[image_id] = {"id": image_id, "userInput": "played"}
return {
"flashcards": {
"contentSeen": True,
"cardsState": cards_state,
"mediaState": media_state,
}
}
def build_response_categorisation(task: TaskInfo) -> dict:
"""categorisation: 分类题 — 优先使用expectedResponse,否则提交空映射"""
expected = task.expected_response.get("contents", {}).get("categorisation", {})
if expected:
return {"categorisation": expected}
# 回退:从task.detail获取items,全部设为未分类
cat = task.task_detail.get("categorisation", {})
items = cat.get("items", [])
result = {}
for item in items:
iid = item.get("id", "")
result[iid] = {"id": iid, "userInput": ""}
return {"categorisation": result}
# 任务类型 → 答题策略映射
RESPONSE_BUILDERS = {
"media-with-time-markers": build_response_media_with_time_markers,
"language-focus": build_response_language_focus,
"sequencing": build_response_sequencing,
"gapfill": build_response_gapfill,
"matching": build_response_matching,
"flashcards": build_response_flashcards,
"speaking-practice": build_response_speaking_practice,
"multiple-choice": build_response_multiple_choice,
"text-highlights": build_response_text_highlights,
"categorisation": build_response_categorisation,
}
# ============================================================
# 核心客户端
# ============================================================
class EFCourseAutopilot:
"""EF课程自动完成客户端
认证机制:
- 步骤① (learn.corporate域): 需要 X-EF-Access JWT token
- 步骤②-④ (api.ef.studio域): 需要 Authorization: Bearer JWT token
"""
def __init__(
self,
token: str = "",
cookie: str = "",
base_url: str = DEFAULT_BASE_URL,
studio_api: str = DEFAULT_STUDIO_API,
locale: str = DEFAULT_LOCALE,
timezone: str = DEFAULT_TIMEZONE,
delay: float = DEFAULT_DELAY,
skip_on_fail: bool = False,
verify_ssl: bool = False,
dry_run: bool = False,
verbose: bool = False,
):
self.token = token
self.cookie = cookie
self.base_url = base_url.rstrip("/")
self.studio_api = studio_api.rstrip("/")
self.locale = locale
self.timezone = timezone
self.delay = delay
self.skip_on_fail = skip_on_fail
self.verify_ssl = verify_ssl
self.dry_run = dry_run
self.verbose = verbose
# 根据locale推断instructionsLocale (zh-CN → zh_CN)
self.instructions_locale = locale.replace("-", "_")
if self.instructions_locale not in ("zh_CN", "en_US", "ja_JP", "ko_KR", "de_DE", "fr_FR", "es_ES", "pt_BR"):
self.instructions_locale = "en_US"
self.session = requests.Session()
self._setup_session()
# 状态
self.ctx = CourseContext()
self.lesson = LessonSession()
self.activities: list[ActivityInfo] = []
self.results: list[SubmitResult] = []
def _setup_session(self):
"""配置HTTP会话的通用headers和SSL设置"""
self.session.headers.update({
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/148.0.0.0 Safari/537.36"
),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
})
# SSL证书验证
if not self.verify_ssl:
self.session.verify = False
# 禁用InsecureRequestWarning警告
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def _learn_headers(self) -> dict:
"""learn.corporate域名的请求headers (需要X-EF-Access或Cookie)
认证优先级:
1. X-EF-Access token (JWT, ~10小时有效)
2. Cookie (浏览器完整Cookie字符串)
"""
headers = {
"Referer": f"{self.base_url}/wl/study-plan?product=b2b",
"x-ef-correlation-id": self._correlation_id(),
}
if self.token:
headers["X-EF-Access"] = self.token
if self.cookie:
headers["Cookie"] = self.cookie
return headers
def _studio_headers(self) -> dict:
"""api.ef.studio域名的请求headers
需要 Authorization: Bearer JWT token。
"""
headers = {
"Origin": "https://lesson-player.ef.studio.ef.com.cn",
"Referer": "https://lesson-player.ef.studio.ef.com.cn/",
"x-ef-correlation-id": self._correlation_id(),
}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
return headers
def _correlation_id(self) -> str:
"""生成x-ef-correlation-id (UUID格式)"""
return str(uuid.uuid4())
def _log(self, msg: str):
"""普通日志"""
print(f" {msg}")
def _log_step(self, step: str, msg: str):
"""步骤日志"""
print(f"\n{'='*60}")
print(f" {step}: {msg}")
print(f"{'='*60}")
def _log_progress(self, current: int, total: int, label: str):
"""进度日志"""
pct = current / total * 100 if total > 0 else 0
bar_len = 30
filled = int(bar_len * current / total) if total > 0 else 0
bar = "" * filled + "" * (bar_len - filled)
print(f"\r [{bar}] {pct:.0f}% ({current}/{total}) {label}", end="", flush=True)
# --------------------------------------------------------
# 步骤①: 获取课程焦点
# --------------------------------------------------------
def step_get_focus(self) -> CourseContext:
"""GET /wl/api/study-plan/focus → 获取courseId/levelId/unitId/nodeId
需要认证:X-EF-Access token 或 Cookie
"""
self._log_step("步骤①", "GET /wl/api/study-plan/focus")
url = f"{self.base_url}/wl/api/study-plan/focus"
params = {"locale": self.locale}
headers = self._learn_headers()
if self.dry_run:
self._log(f"[DRY-RUN] GET {url} params={params}")
self._log(f"[DRY-RUN] Auth: {'X-EF-Access' if self.token else 'Cookie' if self.cookie else 'NONE'}")
return self.ctx
resp = self.session.get(url, params=params, headers=headers, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
data = resp.json()
if self.verbose:
self._log(f"Response: {json.dumps(data, ensure_ascii=False)[:800]}...")
# 解析focus数据 — 兼容多种响应结构
focus = data.get("focus", data)
course = focus.get("course", {})
level = focus.get("level", {})
unit = focus.get("unit", {})
lesson = focus.get("lesson", {})
self.ctx = CourseContext(
course_id=course.get("id", ""),
level_id=level.get("id", ""),
unit_id=unit.get("id", ""),
node_id=lesson.get("id", ""),
)
self._log(f"courseId = {self.ctx.course_id}")
self._log(f"levelId = {self.ctx.level_id}")
self._log(f"unitId = {self.ctx.unit_id}")
self._log(f"nodeId = {self.ctx.node_id}")
if not self.ctx.course_id or not self.ctx.node_id:
raise ValueError(
"未能从focus API获取必要的courseId/nodeId。\n"
"可能原因:\n"
" 1. Token已过期(有效期约10小时)\n"
" 2. 当前用户没有进行中的课程\n"
" 3. 需要手动指定 --course-id 和 --node-id"
)
return self.ctx
# --------------------------------------------------------
# 步骤②: 注册课程会话 (获取lessonId)
# --------------------------------------------------------
def step_open_lesson_enrollment(self) -> str:
"""POST /study/progress/enrollments/{courseId}/open-lesson → 获取lessonId
无需额外认证:courseId即为上下文凭证
"""
self._log_step("步骤②", f"POST enrollments/{self.ctx.course_id}/open-lesson")
url = f"{self.studio_api}/study/progress/enrollments/{self.ctx.course_id}/open-lesson"
body = {
"nodeId": self.ctx.node_id,
"instructionsLocale": self.instructions_locale,
"publishTag": "live",
}
headers = self._studio_headers()
if self.dry_run:
self._log(f"[DRY-RUN] POST {url}")
self._log(f" Body: {json.dumps(body, ensure_ascii=False)}")
self.lesson.lesson_id = "dry-run-lesson-id"
return self.lesson.lesson_id
resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
data = resp.json()
if self.verbose:
self._log(f"Response: {json.dumps(data, ensure_ascii=False)[:500]}...")
self.lesson.lesson_id = data.get("lessonId", "")
self._log(f"lessonId = {self.lesson.lesson_id}")
if not self.lesson.lesson_id:
raise ValueError("未能获取lessonId,请检查courseId和nodeId是否正确")
return self.lesson.lesson_id
# --------------------------------------------------------
# 步骤③: 初始化课程 (获取session/tasks)
# --------------------------------------------------------
def step_open_lesson_command(self) -> list[ActivityInfo]:
"""POST /study/lesson/command (open-lesson) → 获取sessionId/activities/tasks
无需额外认证:lessonId即为能力令牌
"""
self._log_step("步骤③", "POST lesson/command (open-lesson)")
url = f"{self.studio_api}/study/lesson/command"
body = {
"commandType": "open-lesson",
"commandData": {
"openLesson": {
"lessonId": self.lesson.lesson_id,
"instructionsLocale": self.instructions_locale,
}
},
"clientState": {
"lastVersion": 0,
"lessonId": self.lesson.lesson_id,
},
}
headers = self._studio_headers()
if self.dry_run:
self._log(f"[DRY-RUN] POST {url}")
self._log(f" Body: {json.dumps(body, ensure_ascii=False)[:300]}...")
return []
resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
data = resp.json()
# 检查命令状态
cmd_status = data.get("commandStatus", {})
if not cmd_status.get("successful", False):
raise ValueError(f"open-lesson命令失败: {cmd_status.get('errorMessage', 'unknown')}")
# 提取lastVersion
self.lesson.last_version = data.get("eventHistory", {}).get("lastVersion", 0)
self._log(f"lastVersion = {self.lesson.last_version}")
# 解析events获取sessionId和activities/tasks
events = data.get("eventHistory", {}).get("events", [])
for ev in events:
ev_type = ev.get("type", "")
# 提取sessionId
if ev_type == "student-joined-lesson":
joined = ev.get("data", {}).get("studentJoinedLesson", {})
self.lesson.session_id = joined.get("sessionId", "")
self._log(f"sessionId = {self.lesson.session_id}")
# 提取activities和tasks
if ev_type == "activity-sent":
act_data = ev.get("data", {}).get("activitySent", {})
act = act_data.get("activity", {})
activity_id = act.get("id", "")
tasks_raw = act.get("tasks", [])
activity = ActivityInfo(activity_id=activity_id)
for t in tasks_raw:
task_info = TaskInfo(
activity_id=activity_id,
task_id=t.get("id", ""),
task_type=t.get("taskType", ""),
expected_response=t.get("expectedResponse", {}),
task_detail=t.get("task", {}),
)
activity.tasks.append(task_info)
self.activities.append(activity)
total_tasks = sum(len(a.tasks) for a in self.activities)
self._log(f"共获取 {len(self.activities)} 个Activity, {total_tasks} 个Task")
for i, act in enumerate(self.activities, 1):
for j, task in enumerate(act.tasks, 1):
self._log(
f" Activity {i}/{len(self.activities)} "
f"Task {j}/{len(act.tasks)}: "
f"type={task.task_type}, "
f"id={task.task_id[:16]}..."
)
if not self.lesson.session_id:
raise ValueError("未能从open-lesson响应中提取sessionId")
return self.activities
# --------------------------------------------------------
# 步骤④: 提交任务答案
# --------------------------------------------------------
def step_submit_task(self, task: TaskInfo, time_spent: int = 5) -> SubmitResult:
"""POST /study/lesson/command (submit-task-response) → 提交单个任务
关键:lastVersion必须链式递增,每次提交返回新version
"""
url = f"{self.studio_api}/study/lesson/command"
# 根据taskType构建响应内容
builder = RESPONSE_BUILDERS.get(task.task_type)
if not builder:
self._log(f" ⚠ 未知任务类型: {task.task_type}, 使用空响应")
contents = {}
else:
contents = builder(task)
response_body = {
"taskId": task.task_id,
"type": task.task_type,
"contents": contents,
}
body = {
"commandType": "submit-task-response",
"commandData": {
"submitTaskResponse": {
"lessonId": self.lesson.lesson_id,
"activityId": task.activity_id,
"sessionId": self.lesson.session_id,
"response": response_body,
"timeSpentSecs": time_spent,
}
},
"clientState": {
"lessonId": self.lesson.lesson_id,
"lastVersion": self.lesson.last_version,
},
}
headers = self._studio_headers()
if self.dry_run:
self._log(f" [DRY-RUN] taskId={task.task_id[:16]}... "
f"type={task.task_type} version={self.lesson.last_version}")
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=True,
new_version=self.lesson.last_version + 1,
)
for attempt in range(MAX_RETRIES + 1):
try:
resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
data = resp.json()
# 更新lastVersion(链式依赖的关键!)
new_version = data.get("eventHistory", {}).get("lastVersion", 0)
cmd_status = data.get("commandStatus", {})
success = cmd_status.get("successful", False)
if success:
self.lesson.last_version = new_version
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=True,
new_version=new_version,
)
else:
error_msg = cmd_status.get("errorMessage", "unknown")
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=False,
error=error_msg,
new_version=self.lesson.last_version,
)
except requests.exceptions.RequestException as e:
if attempt < MAX_RETRIES:
self._log(f" ⚠ 请求失败(尝试 {attempt+1}/{MAX_RETRIES+1}): {e}")
time.sleep(1)
continue
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=False,
error=str(e),
)
def step_skip_task(self, task: TaskInfo) -> SubmitResult:
"""POST /study/lesson/command (skip-task) → 跳过某个任务(降级策略)
当submit失败时,尝试skip作为回退
"""
url = f"{self.studio_api}/study/lesson/command"
body = {
"commandType": "skip-task",
"commandData": {
"skipTask": {
"lessonId": self.lesson.lesson_id,
"activityId": task.activity_id,
"sessionId": self.lesson.session_id,
"taskId": task.task_id,
}
},
"clientState": {
"lessonId": self.lesson.lesson_id,
"lastVersion": self.lesson.last_version,
},
}
headers = self._studio_headers()
if self.dry_run:
self._log(f" [DRY-RUN] SKIP taskId={task.task_id[:16]}... version={self.lesson.last_version}")
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=True,
skipped=True,
new_version=self.lesson.last_version + 1,
)
try:
resp = self.session.post(url, json=body, headers=headers, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
data = resp.json()
new_version = data.get("eventHistory", {}).get("lastVersion", 0)
cmd_status = data.get("commandStatus", {})
success = cmd_status.get("successful", False)
if success:
self.lesson.last_version = new_version
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=True,
skipped=True,
new_version=new_version,
)
else:
error_msg = cmd_status.get("errorMessage", "unknown")
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=False,
error=f"skip-failed: {error_msg}",
)
except requests.exceptions.RequestException as e:
return SubmitResult(
task_id=task.task_id,
task_type=task.task_type,
success=False,
error=f"skip-exception: {e}",
)
# --------------------------------------------------------
# 主流程
# --------------------------------------------------------
def run(self, course_id: str = "", node_id: str = "") -> bool:
"""执行完整的自动完成课程流程"""
print(f"\n{'#'*60}")
print(f" EF Course Autopilot")
print(f" base_url = {self.base_url}")
print(f" studio_api= {self.studio_api}")
print(f" locale = {self.locale} timezone = {self.timezone}")
print(f" delay = {self.delay}s skip_on_fail = {self.skip_on_fail}")
print(f" dry_run = {self.dry_run} verbose = {self.verbose}")
print(f" ssl_verify= {self.verify_ssl}")
print(f" auth = {'X-EF-Access token' if self.token else 'Cookie' if self.cookie else '⚠️ NO AUTH'}")
if not self.verify_ssl:
print(f" ⚠️ SSL证书验证已禁用")
print(f"{'#'*60}")
if not self.token and not self.cookie:
print("\n❌ 错误:必须提供 --token 或 --cookie 参数")
return False
start_time = time.time()
try:
# 步骤①: 获取课程焦点
if node_id and course_id:
self.ctx = CourseContext(
course_id=course_id,
node_id=node_id,
)
self._log_step("步骤①", "使用手动指定的courseId/nodeId (跳过focus API)")
self._log(f"courseId = {self.ctx.course_id}")
self._log(f"nodeId = {self.ctx.node_id}")
else:
self.step_get_focus()
# 步骤②: 注册课程会话
self.step_open_lesson_enrollment()
# 步骤③: 初始化课程
self.step_open_lesson_command()
# 步骤④: 逐个提交任务
total_tasks = sum(len(a.tasks) for a in self.activities)
if total_tasks == 0:
self._log("\n⚠️ 没有发现任何任务,课程可能已完成或结构异常")
return True
self._log_step("步骤④", f"开始提交任务答案 (共 {total_tasks} 个)")
completed = 0
for ai, activity in enumerate(self.activities, 1):
for ti, task in enumerate(activity.tasks, 1):
completed += 1
task_label = (f"[{completed}/{total_tasks}] "
f"Act{ai}/{len(self.activities)} "
f"Task{ti}/{len(activity.tasks)} "
f"type={task.task_type}")
# 进度条
self._log_progress(completed, total_tasks, task.task_type)
# 模拟真实操作时间 (3~15秒随机)
time_spent = random.randint(3, 15)
# 提交任务
result = self.step_submit_task(task, time_spent=time_spent)
if result.success:
self.results.append(result)
# 详细日志输出到新行
print() # 换行(进度条后)
self._log(f"{task_label} version → {self.lesson.last_version}")
else:
print() # 换行
self._log(f"{task_label} 错误: {result.error}")
if self.skip_on_fail:
# 降级策略:尝试skip
self._log(f" ↳ 尝试跳过任务...")
skip_result = self.step_skip_task(task)
if skip_result.success:
self._log(f" ↳ ⏭️ 已跳过 version → {self.lesson.last_version}")
skip_result.task_type = task.task_type
self.results.append(skip_result)
else:
self._log(f" ↳ ❌ 跳过也失败: {skip_result.error}")
self.results.append(result)
else:
self.results.append(result)
# 延迟(模拟真实操作节奏)
if self.delay > 0 and not self.dry_run:
time.sleep(self.delay)
# 汇总报告
elapsed = time.time() - start_time
self._print_summary(total_tasks, elapsed)
return all(r.success for r in self.results)
except requests.exceptions.HTTPError as e:
print(f"\n❌ HTTP错误: {e}")
if e.response is not None:
print(f" 状态码: {e.response.status_code}")
print(f" 响应: {e.response.text[:500]}")
if e.response.status_code == 401:
print(f" 💡 提示:Token可能已过期,请重新获取")
elif e.response.status_code == 403:
print(f" 💡 提示:权限不足,请检查Token是否对应用户")
return False
except json.JSONDecodeError as e:
print(f"\n❌ 响应解析失败: {e}")
return False
except Exception as e:
print(f"\n❌ 执行异常: {e}")
traceback.print_exc()
return False
def _print_summary(self, total: int, elapsed: float):
"""打印执行汇总"""
success_count = sum(1 for r in self.results if r.success and not r.skipped)
skipped_count = sum(1 for r in self.results if r.skipped)
failed_count = sum(1 for r in self.results if not r.success)
print(f"\n{'#'*60}")
print(f" 执行完成!")
print(f"{'#'*60}")
print(f" 总任务数: {total}")
print(f" ✅ 成功提交: {success_count}")
print(f" ⏭️ 跳过任务: {skipped_count}")
print(f" ❌ 失败任务: {failed_count}")
print(f" ⏱️ 总耗时: {elapsed:.1f}")
print(f" 📊 最终version: {self.lesson.last_version}")
if failed_count > 0:
print(f"\n 失败任务详情:")
for r in self.results:
if not r.success:
print(f" - type={r.task_type} id={r.task_id[:16]}... error={r.error}")
# 按任务类型统计
type_stats: dict[str, dict] = {}
for r in self.results:
t = r.task_type
if t not in type_stats:
type_stats[t] = {"total": 0, "success": 0, "failed": 0, "skipped": 0}
type_stats[t]["total"] += 1
if r.success and not r.skipped:
type_stats[t]["success"] += 1
elif r.skipped:
type_stats[t]["skipped"] += 1
else:
type_stats[t]["failed"] += 1
if type_stats:
print(f"\n 任务类型统计:")
for t, s in type_stats.items():
print(f" {t:30s} {s['success']}/{s['total']} 成功"
+ (f" {s['skipped']} 跳过" if s['skipped'] else "")
+ (f" {s['failed']} 失败" if s['failed'] else ""))
print(f"{'#'*60}")
# ============================================================
# 命令行入口
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="EF Course Autopilot — 自动完成EF课程测试脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 最简使用:仅提供token,自动获取课程信息并完成
python3 ef_course_autopilot.py --token "eyJraWQiOi..."
# 使用Cookie认证(完整Cookie字符串)
python3 ef_course_autopilot.py --cookie "_ga=...; session=..."
# 指定特定课时
python3 ef_course_autopilot.py --token "eyJraWQiOi..." --course-id 51c6844b-xxx --node-id 6b530eb9-xxx
# 失败时自动跳过继续
python3 ef_course_autopilot.py --token "eyJraWQiOi..." --skip-on-fail
# 遇到SSL证书错误时禁用验证
python3 ef_course_autopilot.py --token "eyJraWQiOi..." --verify-ssl
# 仅预览(不实际发送请求)
python3 ef_course_autopilot.py --token "eyJraWQiOi..." --dry-run
# 详细模式
python3 ef_course_autopilot.py --token "eyJraWQiOi..." --verbose
获取token方法:
1. 浏览器登录 learn.corporate.ef.com.cn
2. 打开DevTools → Network
3. 找到任意 /wl/api/ 请求
4. 复制 Request Headers 中的 X-EF-Access 值
获取Cookie方法:
1. 浏览器登录 learn.corporate.ef.com.cn
2. 打开DevTools → Application → Cookies
3. 复制完整Cookie字符串
参数依赖说明:
--token 仅步骤①(focus)需要,步骤②-④无需认证
--cookie 可替代--token,需包含完整Cookie字符串
最简输入: 仅需 --token 或 --cookie 中的一个
""",
)
# 认证参数(二选一)
auth_group = parser.add_argument_group("认证参数(二选一)")
auth_group.add_argument(
"--token", default="",
help="X-EF-Access JWT token (从浏览器DevTools获取,约10小时有效)",
)
auth_group.add_argument(
"--cookie", default="",
help="浏览器完整Cookie字符串 (可替代--token)",
)
# 目标参数
target_group = parser.add_argument_group("目标参数(可选,默认自动获取)")
target_group.add_argument(
"--course-id", default="",
help="直接指定courseId (配合--node-id使用, 跳过focus API)",
)
target_group.add_argument(
"--node-id", default="",
help="直接指定要打开的课时nodeId (配合--course-id使用)",
)
# 环境参数
env_group = parser.add_argument_group("环境参数")
env_group.add_argument(
"--base-url", default=DEFAULT_BASE_URL,
help=f"EF学习平台基础URL (默认: {DEFAULT_BASE_URL})",
)
env_group.add_argument(
"--studio-api", default=DEFAULT_STUDIO_API,
help=f"Studio API基础URL (默认: {DEFAULT_STUDIO_API})",
)
env_group.add_argument(
"--locale", default=DEFAULT_LOCALE,
help=f"界面语言 (默认: {DEFAULT_LOCALE})",
)
env_group.add_argument(
"--timezone", default=DEFAULT_TIMEZONE,
help=f"时区 (默认: {DEFAULT_TIMEZONE})",
)
# 行为参数
behavior_group = parser.add_argument_group("行为参数")
behavior_group.add_argument(
"--delay", type=float, default=DEFAULT_DELAY,
help=f"每次submit之间的延迟秒数 (默认: {DEFAULT_DELAY})",
)
behavior_group.add_argument(
"--skip-on-fail", action="store_true",
help="提交失败时自动跳过该任务,尝试skip-task继续",
)
behavior_group.add_argument(
"--verify-ssl", action="store_true",
help="启用SSL证书验证(默认禁用,解决macOS证书问题)",
)
behavior_group.add_argument(
"--dry-run", action="store_true",
help="仅打印将要执行的请求,不实际发送",
)
behavior_group.add_argument(
"--verbose", action="store_true",
help="打印详细请求/响应信息",
)
args = parser.parse_args()
# 验证认证参数
if not args.token and not args.cookie:
parser.error("必须提供 --token 或 --cookie 参数(二选一)")
if args.course_id and not args.node_id:
parser.error("指定了 --course-id 但缺少 --node-id,两者必须同时指定")
if args.node_id and not args.course_id:
parser.error("指定了 --node-id 但缺少 --course-id,两者必须同时指定")
autopilot = EFCourseAutopilot(
token=args.token,
cookie=args.cookie,
base_url=args.base_url,
studio_api=args.studio_api,
locale=args.locale,
timezone=args.timezone,
delay=args.delay,
skip_on_fail=args.skip_on_fail,
verify_ssl=args.verify_ssl,
dry_run=args.dry_run,
verbose=args.verbose,
)
success = autopilot.run(
course_id=args.course_id,
node_id=args.node_id,
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()