#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @FileName :server.py
# @Time :2024/8/26 下午10:01
# @Author :CAISHILONG
"""
用于启动 app ,并开启cdp服务,支持pc,android,ios
"""
import importlib.resources
import os
import socket
import subprocess
import sys
import time
import adbutils
import psutil
import requests
from ha4t.utils.log_utils import log_out
def _get_adapter_path() -> str:
"""
获取适配器路径
:return: 适配器路径
:Example:
>>> path = _get_adapter_path() # 获取适配器路径
"""
if sys.version_info < (3, 9):
context = importlib.resources.path("ha4t.binaries", "__init__.py")
else:
ref = importlib.resources.files("ha4t.binaries") / "__init__.py"
context = importlib.resources.as_file(ref)
with context as path:
pass
# Return the dir. We assume that the data files are on a normal dir on the fs.
return str(path.parent)
[文档]
class Server:
"""
window系统进程管理类,主要用于管理服务进程
"""
[文档]
def kill_dead_servers(self, port: int) -> None:
"""
结束死掉的服务器进程
:param port: 需要结束的进程所占用的端口
:Example:
>>> server = Server()
>>> server.kill_dead_servers(9222) # 结束占用9222端口的进程
"""
if pid := self.get_port_exists(port):
log_out(f"正在结束本机进程 {port}, pid {pid}")
cmd = f"taskkill /f /pid {self.get_pid_by_port(port)}"
subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
while self.pid_exists(pid):
time.sleep(0.1)
log_out(f"进程 {port} 已结束, pid {pid}")
[文档]
def kill_pid(self, pid: int) -> None:
"""
结束指定的进程
:param pid: 进程ID
:Example:
>>> server = Server()
>>> server.kill_pid(1234) # 结束PID为1234的进程
"""
log_out(f"正在结束本机进程 pid {pid}")
cmd = f"taskkill /f /pid {pid}"
subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
while self.pid_exists(pid):
time.sleep(0.1)
log_out(f"id {pid} kill success")
[文档]
@classmethod
def find_process_by_name(cls, name: str) -> list:
"""
根据进程名查找进程
:param name: 进程名
:return: 进程信息列表
:Example:
>>> processes = Server.find_process_by_name("chrome") # 查找名为chrome的进程
"""
list_process = []
seen = {} # 用于记录已经添加过的进程,格式为 {(pid, port): True}
for proc in psutil.process_iter(['pid', 'name']):
if name in proc.info['name']:
try:
pid = proc.info['pid']
process = psutil.Process(pid)
connections = process.net_connections()
for conn in connections:
if conn.status == psutil.CONN_LISTEN:
# 检查是否已经添加过该进程的该端口
if (pid, conn.laddr.port) not in seen:
seen[(pid, conn.laddr.port)] = True # 标记为已添加
list_process.append({
"pid": pid,
"name": proc.info['name'],
"port": conn.laddr.port
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return list_process
[文档]
@staticmethod
def get_pid_by_port(port) -> str:
"""
根据端口获取进程ID
:param port: 端口号
:return: 进程ID
:Example:
>>> pid = Server.get_pid_by_port(9222) # 获取占用9222端口的进程ID
"""
cmd = f"netstat -ano | findstr :{port} | findstr LISTENING"
lines = subprocess.check_output(cmd, shell=True).decode().strip().splitlines()
for line in lines:
pid = line.split(" ")[-1]
if pid != 0:
return pid
[文档]
@classmethod
def get_pid(cls, process) -> str:
"""
获取进程的PID
:param process: 进程对象
:return: 进程ID
:Example:
>>> pid = Server.get_pid(process) # 获取进程对象的PID
"""
return process.pid if process else None
[文档]
@staticmethod
def pid_exists(pid) -> bool:
"""
检查进程是否存在
:param pid: 进程ID
:return: 是否存在
:Example:
>>> exists = Server.pid_exists(1234) # 检查PID为1234的进程是否存在
"""
try:
subprocess.check_output(f"ps -p {pid}", shell=True, stderr=subprocess.DEVNULL)
return True
except subprocess.CalledProcessError:
return False
[文档]
@classmethod
def get_port_exists(cls, port) -> bool:
"""
检查端口是否被占用
:param port: 端口号
:return: 是否被占用
:Example:
>>> exists = Server.get_port_exists(9222) # 检查9222端口是否被占用
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
[文档]
@classmethod
def wait_connect(cls, port, timeout=10) -> None:
"""
等待连接
:param port: 端口号
:param timeout: 超时时间
:raises TimeoutError: 如果连接超时
:Example:
>>> Server.wait_connect(9222, timeout=10) # 等待连接9222端口,超时10秒
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
status_code = requests.get(f"http://localhost:{port}/json").status_code
if status_code == 200:
break
except:
pass
if time.time() - start_time > timeout:
raise TimeoutError("连接超时")
time.sleep(0.1)
[文档]
class CdpServer(Server):
def __init__(self, ignore_exist_port=True):
"""
开启H5应用cdp服务,支持pc,android,ios
:param ignore_exist_port: 是否忽略已存在的端口,关闭后每次都会先结束已存在的端口
:Example:
>>> cdp_server = CdpServer(ignore_exist_port=False) # 创建CdpServer实例并设置忽略已存在端口为False
"""
self.ws_endpoint = None
self.ignore_exist_port = ignore_exist_port
self.adapter_pid = None
@staticmethod
def _check_port_connection(port, timeout=10) -> bool:
"""
检查端口连接
:param port: 端口号
:param timeout: 超时时间
:return: 连接是否成功
:Example:
>>> is_connected = CdpServer.check_port_connection(9222) # 检查9222端口的连接
"""
try:
requests.get(f"http://localhost:{port}/json", timeout=timeout)
return True
except requests.RequestException:
return False
def _can_start_server(self, port) -> bool:
"""
检查是否可以启动服务器
:param port: 端口号
:return: 是否可以启动
:Example:
>>> can_start = cdp_server.can_start_server(9222) # 检查是否可以启动9222端口的服务器
"""
if self._check_port_connection(port):
log_out(f"端口{port}已存在")
if self.ignore_exist_port:
log_out(f"忽略端口{port},继续测试")
return False
else:
log_out(f"查询启动端口{port},如需要忽略已存在端口,请设置ignore_exist_port=True")
self.kill_dead_servers(port)
return True
log_out(f"开始{port}CDP端口转发...")
return True
[文档]
def start_server_for_android_app(self, adb: adbutils.AdbDevice, port=9222, timeout=10) -> None:
"""
开启android app cdp服务
:param adb: adb设备
:param port: 端口
:param timeout: 超时时间
:Example:
>>> cdp_server.start_server_for_android_app(adb_device, port=9222) # 启动Android应用的CDP服务
"""
can_start = self._can_start_server(port)
if can_start:
rs: str = adb.shell(['grep', '-a', 'webview_devtools_remote', '/proc/net/unix'])
end = rs.split("@")[-1]
log_out(f"app webview 进程 {end} 已存在,尝试端口转发")
server = adb.forward(local=f"tcp:{port}", remote=f"localabstract:{end}")
self.wait_connect(port, timeout)
self.ws_endpoint = f"http://localhost:{port}"
log_out(f"CDP端口转发成功,端口:{port}")
return server
self.ws_endpoint = f"http://localhost:{port}"
return None
[文档]
def start_server_for_ios_app(self, port=9222, timeout=10) -> None:
"""
开启ios app cdp服务
:param port: 端口
:param timeout: 超时时间
:param use_existing_port: 是否使用已存在的端口
:Example:
>>> cdp_server.start_server_for_ios_app(port=9222) # 启动iOS应用的CDP服务
"""
# 结束已存在的端口
self.kill_dead_servers(port)
log_out("正在查找ios_webkit_debug_proxy进程是否存在")
p_list = self.find_process_by_name('ios_webkit_debug_proxy')
if p_list:
log_out(f"发现ios_webkit_debug_proxy进程,准备结束")
for i in p_list:
self.kill_pid(i['pid'])
else:
log_out("未发现ios_webkit_debug_proxy进程")
# 启动服务
server = subprocess.Popen(
[os.path.join(_get_adapter_path(), "remotedebug_ios_webkit_adapter"), f"--port={str(port)}"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True)
self.wait_connect(port, timeout)
self.ws_endpoint = f"http://localhost:{port}"
log_out(f"CDP端口转发成功,端口:{port}")
self.adapter_pid = server.pid
[文档]
def start_server_for_windows_app(self, app_path: str, port=9222, reset=False, user_data_dir=None, timeout=10,
lang="zh-CN") -> None:
"""
开启windows app cdp服务
:param app_path: 应用路径
:param port: 端口
:param reset: 是否重置用户数据
:param user_data_dir: 用户数据目录
:param timeout: 超时时间
:param lang: 语言
:Example:
>>> cdp_server.start_server_for_windows_app("C:/path/to/app.exe", port=9222) # 启动Windows应用的CDP服务
"""
can_start = self._can_start_server(port=port)
if can_start:
start_app_args = [app_path, f"--remote-debugging-port={port}"]
print(reset)
if reset:
if user_data_dir is None:
user_data_dir = os.path.join(os.path.dirname(__file__), 'app_user_data')
if os.path.exists(user_data_dir):
try:
os.remove(user_data_dir)
print(f"已成功删除用户数据目录: {user_data_dir}")
except PermissionError as e:
print(f"没有权限删除 {user_data_dir}. 错误信息: {e}")
except FileNotFoundError as e:
print(f"找不到文件或目录: {e}")
except Exception as e:
print(f"删除 {user_data_dir} 时发生未知错误: {e}")
start_app_args.append(f"--user-data-dir={user_data_dir}")
start_app_args.append("--no-sandbox")
start_app_args.append(f"--lang={lang}")
log_out(f"启动命令:{start_app_args}")
app_server = subprocess.Popen(start_app_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
requests.get(f"http://localhost:{port}/json", timeout=timeout)
log_out(f"CDP端口转发成功,端口:{port}")
self.ws_endpoint = f"http://localhost:{port}"
return app_server
self.ws_endpoint = f"http://localhost:{port}"
return None
[文档]
def start_server_for_mac_app(self, file_path: str, port=9222) -> None:
"""
TODO: 这里需要根据macOS的具体情况实现
:param file_path: 应用路径
:param port: 端口
:Example:
>>> cdp_server.start_server_for_mac_app("/path/to/app", port=9222) # 启动macOS应用的CDP服务
"""
pass
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
退出时清理资源
:param exc_type: 异常类型
:param exc_val: 异常值
:param exc_tb: 异常追踪
:Example:
>>> with CdpServer() as server: # 使用上下文管理器
... server.start_server_for_ios_app(port=9222)
"""
if self.adapter_pid:
subprocess.Popen(f"kill -9 {self.adapter_pid}", shell=True)
if __name__ == '__main__':
server = CdpServer()
server.start_server_for_ios_app(port=9222, timeout=10)