ha4t.drivers.ios 源代码

# -*- coding: utf-8 -*-
"""IOSDriver: 基于 facebook-wda 的 iOS 平台驱动。"""
import os
import subprocess
from typing import Optional, Tuple

import PIL.Image
import wda

from ha4t.drivers.base import BaseDriver
from ha4t.exceptions import DeviceConnectionError


[文档] class IOSDriver(BaseDriver): def __init__(self): self._d: Optional[wda.Client] = None self._serial: str = '' # ── 连接 ─────────────────────────────────────────────────────
[文档] def connect(self, serial: Optional[str] = None, port: int = 8100, **kwargs) -> str: try: self._serial = serial or wda.list_devices()[0].serial except IndexError as e: raise DeviceConnectionError("未找到 iOS 设备,请检查 USB 连接和信任设置") from e try: self._d = wda.USBClient(udid=self._serial, port=port) except Exception as e: raise DeviceConnectionError(f"iOS 设备 {self._serial} 连接失败:{e}") from e return self._serial
# ── 设备信息 ─────────────────────────────────────────────────
[文档] def get_device_info(self) -> dict: info = dict(self._d.info) info["serial"] = self._serial return info
[文档] def screen_size(self) -> Tuple[int, int]: return self._d.window_size()
# ── 截图 ─────────────────────────────────────────────────────
[文档] def screenshot(self) -> PIL.Image.Image: img = self._d.screenshot() if isinstance(img, PIL.Image.Image): return img return img
# ── 点击 / 手势 ──────────────────────────────────────────────
[文档] def tap(self, x: int, y: int, duration: float = 0.1) -> None: # iOS 用 tap_hold 实现按压 self._d.tap_hold(x, y, duration=duration)
[文档] def swipe(self, x1: int, y1: int, x2: int, y2: int, duration: Optional[float] = None, steps: Optional[int] = None) -> None: # wda 的 swipe 不支持 steps,忽略该参数 self._d.swipe(x1, y1, x2, y2, duration=duration)
[文档] def press(self, key: str) -> None: self._d.press(key)
# ── 元素查找 ─────────────────────────────────────────────────
[文档] def find(self, **kwargs): return self._d(**kwargs)
[文档] def find_xpath(self, xpath: str): return self._d.xpath(xpath)
[文档] def get_element_center(self, **kwargs): el = self._d(**kwargs) rect = el.bounds return (int(rect.origin.x + rect.size.width / 2), int(rect.origin.y + rect.size.height / 2))
# ── 应用管理 ─────────────────────────────────────────────────
[文档] def app_start(self, app_name: str, activity: Optional[str] = None) -> None: self._d.app_start(app_name)
[文档] def app_stop(self, app_name: str) -> None: self._d.app_stop(app_name)
[文档] def app_current(self) -> str: return self._d.app_current()["bundleId"]
# ── 文件传输(tidevice / t3) ────────────────────────────────
[文档] def push_file(self, local_path: str, remote_path: str, app_name: str = '') -> None: """remote_path 是 app 容器内相对路径,例如 Documents/xxx""" if not app_name: raise ValueError("iOS push_file 需要传入 app_name") subprocess.run( ["tidevice", '-u', self._serial, 'fsync', '-B', app_name, 'push', local_path, remote_path], check=True )
[文档] def pull_file(self, remote_path: str, local_path: str, app_name: str = '') -> None: """通过 t3 fsync pull 拉取 app 容器内文件。""" if not app_name: raise ValueError("iOS pull_file 需要传入 app_name") cmd = f"t3 fsync -B {app_name} pull {remote_path} {local_path}" subprocess.run(cmd, shell=True, check=True)
[文档] def delete_file(self, remote_path: str, app_name: str = '') -> None: if not app_name: raise ValueError("iOS delete_file 需要传入 app_name") subprocess.run( ["tidevice", '-u', self._serial, 'fsync', '-B', app_name, 'rmtree', remote_path], check=True )
[文档] def mkdir(self, remote_path: str, app_name: str = '') -> None: subprocess.run( ["tidevice", '-u', self._serial, 'fsync', '-B', app_name, 'mkdir', remote_path], check=True )