ha4t.api 源代码

# -*- coding: utf-8 -*-
# @时间       : 2024/8/23 15:24
# @作者       : caishilong
# @文件名      : api.py
# @项目名      : Uimax
# @Software   : PyCharm
"""
此模块包含UI自动化操作接口
提供操作如:点击、滑动、输入、OCR识别等
"""
import os
import subprocess
import time
from typing import Optional, Union, List

import PIL.Image
import logreset
import numpy as np

from ha4t import screen_size, device
from ha4t.aircv.cv import match_loop, Template
from ha4t.config import Config as _CF
from ha4t.orc import OCR
from ha4t.utils.files_operat import get_file_list as _get_file_list
from ha4t.utils.log_utils import log_out, cost_time

logreset.reset_logging()  # paddleocr 会污染 logging
ocr = OCR()


[文档] @cost_time def click(*args, duration: float = 0.1, **kwargs) -> None: """ 点击操作,支持多种定位方式 用法: :Example: >>> click((100,100)) # 坐标点击 >>> click("TEXT") # 文字点击, OCR识别 >>> click(image="path/to/image.png") # 图像匹配点击 >>> click(**kwargs) # uiautomator2/wda的点击(适合原生app,速度快,非H5应用建议使用) """ def perform_click(x, y, duration): """ 执行点击操作,根据平台选择不同的点击方式 :param x: x坐标 :param y: y坐标 :param duration: 点击持续时间 """ if _CF.PLATFORM == "ios": device.driver.tap_hold(x, y, duration=duration) else: device.driver.long_click(x, y, duration=duration) if args: if isinstance(args[0], tuple): if len(args[0]) == 0: raise ValueError("参数不能是空元组") if isinstance(args[0][0], int): perform_click(*args[0], duration) elif isinstance(args[0][0], str): raise NotImplementedError("webview点击暂不支持") elif isinstance(args[0], str): pos = ocr.get_text_pos(args[0], device.driver.screenshot, index=args[1] if len(args) > 1 else 0) perform_click(*pos, duration) perform_click(*pos, duration) elif isinstance(args[0], Template): pos = match_loop(screenshot_func=device.driver.screenshot, template=args[0].filepath, timeout=kwargs.get("timeout", 10), threshold=kwargs.get("threshold", 0.8)) perform_click(*pos, duration) elif isinstance(args[0], dict): if _CF.PLATFORM == "ios": device.driver(**args[0], **kwargs).tap_hold(duration=duration) else: device.driver(**args[0], **kwargs).long_click(duration=duration) elif kwargs.get("image"): path = os.path.join(_CF.CURRENT_PATH, kwargs["image"]) pos = match_loop(screenshot_func=device.driver.screenshot, template=path, timeout=kwargs.get("timeout", 10), threshold=kwargs.get("threshold", 0.8)) perform_click(*pos, duration) else: if _CF.PLATFORM == "ios": device.driver(**kwargs).tap_hold(duration=duration) else: device.driver(**kwargs).long_click(duration=duration)
def _exists(*args, **kwargs) -> bool: """ 判断元素是否存在 :param args: 可变参数,用于不同的定位方式 :param kwargs: 关键字参数,用于uiautomator2/wda的定位 :return: 元素是否存在 :Example: >>> exists((100,100)) # 判断坐标元素是否存在 >>> exists("TEXT") # 判断文字元素是否存在, OCR识别 >>> exists(image="path/to/image.png") # 判断图像元素是否存在 >>> exists(**kwargs) # 判断uiautomator2/wda的元素是否存在 """ if args: if isinstance(args[0], tuple): if isinstance(args[0][0], int): return True elif isinstance(args[0][0], str): raise NotImplementedError("webview点击暂不支持") elif isinstance(args[0], str): try: pos = ocr.get_text_pos(args[0], device.driver.screenshot, index=args[1] if len(args) > 1 else 0, timeout=1) return True except: return False elif isinstance(args[0], dict): path = os.path.join(_CF.CURRENT_PATH, args[0]["image"]) try: match_loop(screenshot_func=device.driver.screenshot, template=path, timeout=kwargs.get("timeout", 10), threshold=kwargs.get("threshold", 0.8)) return True except: return False elif isinstance(args[0], Template): try: match_loop(screenshot_func=device.driver.screenshot, template=args[0].filepath, timeout=kwargs.get("timeout", 10), threshold=kwargs.get("threshold", 0.8)) return True except: return False else: if kwargs.get("image"): path = os.path.join(_CF.CURRENT_PATH, kwargs["image"]) pos = match_loop(screenshot_func=device.driver.screenshot, template=path, timeout=kwargs.get("timeout", 10), threshold=kwargs.get("threshold", 0.8)) if pos: return True else: return False else: return device.driver(**kwargs).exists
[文档] @cost_time def exists(*args, **kwargs) -> bool: """ 判断元素是否存在 :param args: 可变参数,用于不同的定位方式 :param kwargs: 关键字参数,用于uiautomator2/wda的定位 :return: 元素是否存在 :Example: >>> exists((100,100)) # 坐标点击 >>> exists("TEXT") # 文字点击, OCR识别 >>> exists(image="path/to/image.png") # 图像匹配点击 >>> exists(**kwargs) # uiautomator2/wda的点击(适合原生app,速度快,非H5应用建议使用) """ return _exists(*args, **kwargs)
[文档] @cost_time def wait(*args, timeout: float = _CF.FIND_TIMEOUT, reverse: bool = False, raise_error: bool = True, use_in_text: bool = False, **kwargs): """ 等待元素出现,支持多种定位方式 用法: 1. wait("TEXT") # 文字等待, OCR识别 2. web等待 3. uiautomator2/wda的等待(适合原生app,速度快,非H5应用建议使用) :param use_in_text: 是否在文本中使用 :param raise_error: 是否抛出错误 :param reverse: 反向等待 :param args: 可变参数,用于不同的定位方式 :param timeout: 等待超时时间,默认为CF.FIND_TIMEOUT :param kwargs: 关键字参数,用于uiautomator2/wda的定位 :return: 元素是否出现 :Example: >>> wait("TEXT") # 文字等待, OCR识别 >>> wait(image="path/to/image.png") # 图像匹配等待 >>> wait(**kwargs) # uiautomator2/wda的等待(适合原生app,速度快,非H5应用建议使用) """ start_time = time.time() if use_in_text: if isinstance(args[0], str): while True: if reverse: if args[0] not in get_page_text(): return True else: if args[0] in get_page_text(): return True if time.time() - start_time > timeout: if raise_error: raise TimeoutError(f"等待OCR识别到指定文字[{args[0]}]超时") else: return False while True: if reverse: if not _exists(*args, **kwargs): return True else: if _exists(*args, **kwargs): return True if time.time() - start_time > timeout: if raise_error: raise TimeoutError(f"等待元素超时:{args}, {kwargs}") else: return False
[文档] @cost_time def swipe(p1, p2, duration=None, steps=None): """ uiautomator2/wda的滑动操作 :param p1: 起始位置,(x, y)坐标或比例 :param p2: 结束位置,(x, y)坐标或比例 :param duration: 滑动持续时间 :param steps: 滑动步数,1步约5ms,如果设置则忽略duration :Example: >>> swipe((0.5, 0.8), (0.5, 0.3)) # 从中间向上滑动 >>> swipe((0.2, 0.5), (0.8, 0.5), duration=0.5) # 从左向右滑动 """ def calculate_position(p): return (int(p[0] * screen_size[0]), int(p[1] * screen_size[1])) if isinstance(p[0], float) else p pos1 = calculate_position(p1) pos2 = calculate_position(p2) device.driver.swipe(*pos1, *pos2, duration=duration, steps=steps)
[文档] def get_page_text() -> str: """ OCR识别页面文字,返回当前页面所有文字的拼接字符串 可用于断言 :return: 页面上的所有文字拼接成的字符串 """ return ocr.get_page_text(device.driver.screenshot)
[文档] @cost_time def swipe_up(duration: float = 0.2, steps: Optional[int] = None) -> None: """ 向上滑动 :param duration: 滑动持续时间 :param steps: 滑动步数 :Example: >>> swipe_up() # 默认持续时间向上滑动 >>> swipe_up(duration=0.5, steps=10) # 自定义持续时间和步数向上滑动 """ swipe((0.5, 0.8), (0.5, 0.3), duration, steps)
[文档] @cost_time def swipe_down(duration: float = 0.2, steps: Optional[int] = None) -> None: """ 向下滑动 :param duration: 滑动持续时间 :param steps: 滑动步数 :Example: >>> swipe_down() # 默认持续时间向下滑动 >>> swipe_down(duration=0.5, steps=10) # 自定义持续时间和步数向下滑动 """ swipe((0.5, 0.3), (0.5, 0.8), duration, steps)
[文档] @cost_time def swipe_left(duration: float = 0.1, steps: Optional[int] = None) -> None: """ 向左滑动 :param duration: 滑动持续时间 :param steps: 滑动步数 :Example: >>> swipe_left() # 默认持续时间向左滑动 >>> swipe_left(duration=0.5, steps=10) # 自定义持续时间和步数向左滑动 """ swipe((0.8, 0.5), (0.2, 0.5), duration, steps)
[文档] @cost_time def swipe_right(duration: float = 0.1, steps: Optional[int] = None) -> None: """ 向右滑动 :param duration: 滑动持续时间 :param steps: 滑动步数 :Example: >>> swipe_right() # 默认持续时间向右滑动 >>> swipe_right(duration=0.5, steps=10) # 自定义持续时间和步数向右滑动 """ swipe((0.2, 0.5), (0.8, 0.5), duration, steps)
[文档] def screenshot(filename: Optional[str] = None) -> PIL.Image.Image: """ 截图并可选保存到本地 :param filename: 保存截图的文件名,如果为None则不保存 :return: 截图的PIL.Image对象 :Example: >>> img = screenshot() # 截图并不保存 >>> screenshot("screenshot.png") # 截图并保存为文件 """ img = device.driver.screenshot() img = PIL.Image.fromarray(img) if isinstance(img, np.ndarray) else img if filename: img.save(filename) return img
[文档] @cost_time def home() -> None: """返回桌面 :Example: >>> home() # 返回主屏幕 """ device.driver.press("home")
[文档] @cost_time def pull_file(src_path: Union[List[str], str], filename: str) -> None: """ 从app本地路径下载文件到本地 :param src_path: 路径列表或字符串,ios为Documents/xxx,android为/data/data/xxx/files/xxx :param filename: 本地文件名 :Example: >>> pull_file("Documents/file.txt", "local_file.txt") # 从app下载文件 """ log_out(f"从app本地路径{src_path}下载文件{filename}到本地") base = f"t3 fsync -B {_CF.APP_NAME} pull " if _CF.PLATFORM == "ios" else f"adb -s {_CF.DEVICE_SERIAL} pull " root_path = "Library" if _CF.PLATFORM == "ios" else f"/sdcard/Android/data/{_CF.APP_NAME}/files" root_path += "/" + ("/".join(src_path) if isinstance(src_path, list) else src_path) cmd = base + root_path + " " + filename # 执行命令 try: subprocess.run(cmd, shell=True, check=True) log_out(f"文件{root_path}下载成功,路径:{filename}") except subprocess.CalledProcessError as e: log_out(f"文件{root_path}下载失败,原因:{e}", 2) raise
[文档] @cost_time def upload_files(src_path: str) -> None: """ 上传文件或文件夹到设备 :param src_path: 源文件或文件夹路径,可以是列表或字符串 :raises Exception: 如果上传过程中出现错误 :Example: >>> upload_files("local_file.txt") # 上传单个文件 >>> upload_files("my_folder") # 上传文件夹 """ try: if os.path.isdir(src_path): _upload_directory(src_path) else: _upload_file(src_path) log_out( f"文件或文件夹 {src_path} 上传成功!\n" f"{'安卓' if _CF.PLATFORM == 'android' else 'iOS'}路径:{'我的iPhone/' + _CF.APP_NAME if _CF.PLATFORM == 'ios' else '/sdcard'}/{os.path.basename(src_path)}" ) except Exception as e: log_out(f"文件或文件夹 {src_path} 上传失败,原因:{e}", 2) raise
def _upload_directory(dir_path: str) -> None: """ 上传文件夹到设备 :param dir_path: 文件夹路径 :Example: >>> _upload_directory("my_folder") # 上传文件夹 """ if _CF.PLATFORM == "ios": dir_name = os.path.basename(dir_path) subprocess.run( ["tidevice", '-u', _CF.DEVICE_SERIAL, 'fsync', "-B", _CF.APP_NAME, 'mkdir', f"Documents/{dir_name}"], check=True) for file in _get_file_list(dir_path): subprocess.run(["tidevice", '-u', _CF.DEVICE_SERIAL, 'fsync', "-B", _CF.APP_NAME, 'push', file, f"Documents/{dir_name}/{os.path.basename(file)}"], check=True) else: subprocess.run(f"adb -s {_CF.DEVICE_SERIAL} push {dir_path} /sdcard/", shell=True, check=True) def _upload_file(file_path: str) -> None: """ 上传单个文件到设备 :param file_path: 文件路径 :Example: >>> _upload_file("file.txt") # 上传单个文件 """ if _CF.PLATFORM == "ios": subprocess.run(["tidevice", '-u', _CF.DEVICE_SERIAL, 'fsync', "-B", _CF.APP_NAME, 'push', file_path, f"Documents/{os.path.basename(file_path)}"], check=True) else: subprocess.run(f"adb -s {_CF.DEVICE_SERIAL} push {file_path} /sdcard/", shell=True, check=True)
[文档] @cost_time def delete_file(file_path: Union[List[str], str]) -> None: """ 删除设备上的文件或文件夹 :param file_path: 要删除的文件或文件夹路径,可以是列表或字符串 :raises Exception: 如果删除过程中出现错误 :Example: >>> delete_file("Documents/file.txt") # 删除单个文件 >>> delete_file(["Documents", "my_folder"]) # 删除文件夹 """ def directory_exists(file_path1): """检查设备上的目录是否存在""" result = subprocess.run( ["tidevice", "-u", _CF.DEVICE_SERIAL, "fsync", "-B", _CF.APP_NAME, "ls", f"Documents/{file_path}"], capture_output=True, text=True, encoding='utf-8' ) return result.returncode == 0 try: file_path = '/'.join(file_path) if isinstance(file_path, list) else file_path if not directory_exists(file_path): log_out(f"目录 {file_path} 不存在。") return if _CF.PLATFORM == "ios": subprocess.run( ["tidevice", "-u", _CF.DEVICE_SERIAL, "fsync", "-B", _CF.APP_NAME, "rmtree", f"Documents/{file_path}"], check=True ) else: subprocess.run(f"adb -s {_CF.DEVICE_SERIAL} shell rm -r /sdcard/{file_path}", shell=True, check=True) log_out(f"设备上的文件或文件夹 {file_path} 删除成功") except subprocess.CalledProcessError as e: log_out(f"设备上的文件或文件夹 {file_path} 删除失败,原因:{e}", 2) raise
[文档] @cost_time def clear_app(app_name: str = None): """ 清除应用数据 > 仅支持Android平台 :param app_name: 应用名称 :Example: >>> clear_app("com.example.app") # 清除应用数据 :return: """ if _CF.PLATFORM == "android": if app_name is None: app_name = _CF.APP_NAME device.driver.adb_device.app_clear(app_name) else: log_out(f"{clear_app.__name__}仅支持Android平台", 2)
[文档] @cost_time def start_app(app_name: Optional[str] = _CF.APP_NAME, activity: Optional[str] = _CF.ANDROID_ACTIVITY_NAME) -> None: """ 启动应用程序 :param app_name: 应用程序名称,如果为None则使用配置中的默认值 :param activity: Android应用的活动名称,如果为None则使用配置中的默认值 :raises ValueError: 如果是Android平台且activity为None :Example: >>> start_app("com.example.app") # 启动指定应用 """ if _CF.PLATFORM == "ios": if app_name is None: raise ValueError("app_name不能为空") device.driver.app_start(app_name) else: if activity is None: raise ValueError("activity不能为空") device.driver.adb_device.app_start(app_name, activity)
[文档] def get_current_app() -> str: """ 获取当前运行的应用名称 :return: 应用bundleId 或 package name :Example: >>> get_current_app() # 获取当前运行的应用名称 """ if _CF.PLATFORM == "ios": return device.driver.app_current()["bundleId"] else: return device.driver.adb_device.app_current().package
[文档] def restart_app(app_name: Optional[str] = _CF.APP_NAME, activity: Optional[str] = _CF.ANDROID_ACTIVITY_NAME) -> None: """ 重启应用程序并更新CDP连接 :Example: >>> restart_app() # 重启当前应用 """ device.driver.app_stop(app_name) start_app(app_name, activity)
if __name__ == '__main__': pass