用pynput自制远控原型

pynput

Posted by MetaNetworks on June 13, 2022
本页面总访问量

前言

现在远程办公是一大趋势,笔者也使用过向日葵,rustdesk,teamviewer等软件,深深感受到便捷。其中,这些软件一大重要功能,就是获取我们在自己电脑里面鼠标和键盘的逻辑,然后把这些逻辑信息发送给远程。知道了这一逻辑,为何不自己复现复现?

刚好,pynput是python的支持全平台的鼠标、键盘事件库,屏蔽掉平台逻辑差异,简化了上层开发逻辑。

为了简化流程,本篇文章记录如何使用pynput完成鼠标键盘事件的捕获,并利用socket完成事件的传递,以及鼠标键盘操作在远端的复现。

pynput库对代码提示并不友好,很多api IDE提示不出来,这也是笔者不太喜欢python的一大原因。故本文着重记录pynput API的使用以及使用上容易遇到的问题

总体逻辑梳理

计算机A启动server端->计算机A的鼠标或键盘发生操作->生成数据包,使用socket+json传输给计算机B->计算机B解析数据包->计算机B解析后重新模拟键盘/鼠标事件

效果上,当计算机A键入一个按键,计算机B会跟着键入这个按键(模仿操作),计算机A鼠标移动到某处,计算机B的鼠标光标也移动到该处。

事件数据包(Event)定义

键盘

键盘单位事件分为:按下按键、松开按键。故可以用以下数据表示:

1
2
3
4
5
6
7
{
    "type": 0,
    "detail": {
        "action": "release",
        "key": key_data
    }
}

其中type=0表示是鼠标事件,type=1是键盘事件。action分为”press”和”release”,表示按下和松开,key表示按下的是什么按键。

注意:当按下功能键比如F1-F12,pynput传入的是一个Key类;当按下字母按键,pynput捕获的是一个KeyCode类型。传输的时候,由于只能使用基础数据类型,所以可以使用以下代码转成通用的字符串

1
2
3
4
if type(key) is Key:
   key_data = str(key)
elif type(key) is KeyCode:
   key_data = key.char

鼠标事件

鼠标事件共有:移动鼠标到坐标,鼠标滚轮滚动,鼠标按键。

移动鼠标:

其中x, y是offset数据

1
2
3
4
5
6
7
8
{
    "type": 1,
    "detail": {
        "type": "move",
        "x": x,
        "y": y,
    }
}

鼠标滚轮滚动:

其中x,y是滚动时鼠标在屏幕上的位置,dx,dy是单位采样时间,鼠标滚轮滚动幅度。注意:鼠标滚动是有dx,和dy的。按住shift滚动滚轮,页面会变成左右滚动。

1
2
3
4
5
6
7
8
9
10
{
    "type": 1,
    "detail": {
        "type": "scroll",
        "x": x,
        "y": y,
        "dx": dx,
        "dy": dy
    }
}

鼠标点击:

x,y表示点击的坐标,pressed表示是是否是按下鼠标,btn表示是鼠标的键位。

和键盘点击同理。pynput捕获的按钮是Button类,分别为静态的Button.left和right。为了传输方便,直接进行str转字符串。借助python的eval函数很方便的进行解析。

1
2
3
4
5
6
7
8
9
10
{
    "type": 1,
    "detail": {
        "type": "click",
        "x": x,
        "y": y,
        "pressed": pressed,
        "btn": str(btn)
    }
}

服务端逻辑

键盘/鼠标事件监听

pynput将自己分为keyboard和mouse模块。Listener就是对应的监听者,通过传入函数引用完成调用。

1
2
3
4
keyboard_listener = pynput.keyboard.Listener(on_press=on_keyboard_press, on_release=on_keyboard_release)
keyboard_listener.start()
mouse_listener = pynput.mouse.Listener(on_move=on_move, on_scroll=on_scroll, on_click=on_click)
mouse_listener.start()

listener调用与原Thread相同,原理上是开一个新线程完成监听操作并回调。开启使用start函数,停止使用stop()后,通过join()释放多线程资源。

回调函数参数为:

1
2
3
4
5
6
7
8
9
10
def on_click(x: int, y: int, btn: Button, pressed: bool): pass

def on_scroll(x, y, dx, dy): pass

def on_move(x, y): pass

# 键盘
def on_keyboard_press(key: Union[Key|KeyCode]): pass

def on_keyboard_release(key: Union[Key|KeyCode]): pass

在回调后,将数据传入设计的监控类,发给客户端。

服务器端socket逻辑代码(部分)

比较简单,直接贴代码。主要函数为start_listen,外部捕获鼠标键盘事件传入使用action函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class SocketServer:

    def __init__(self):
        sock = socket()
        self.sock_ = sock
        self.client_: [socket] = []
        self.db = DbLogger()
        self.action_lock = Lock()

    def start_listen(self):
        # start logger
        th = Thread(target=self.db.start)
        th.start()
        # start listen
        self.sock_.bind(("0.0.0.0", PORT))
        self.sock_.listen()
        while True:
            connection, addr = self.sock_.accept()
            self.client_.append(connection)
            print("hello client from ", addr)

    def log(self, data: dict, is_client: bool):
        log_queue_lock.acquire()
        if is_client:
            self.db.log_client(data)
        else:
            self.db.log_server(data)
        log_queue_lock.release()

    def action(self, data: dict):
        self.action_lock.acquire()
        try:
            json_str: str = json.dumps(data)
            # print(f"action: {json_str}")
            self.log(data, is_client=False)
            for client in self.client_:
                print(f"触发:给 {client} 发送指令")
                client: socket = client
                if client.fileno() != -1:
                    st_time = time.time()
                    client.send(json_str.encode(encoding="utf-8"))
                    msg: dict = json.loads(client.recv(1024).decode())
                    cost_time = time.time() - st_time
                    msg['cost_time'] = cost_time
                    self.log(msg, is_client=True)
                    print(f"收到client模拟结果:{msg}")
                else:
                    # 跳过断开连接的client
                    print("skip disconnected client`")
                    continue
        except Exception as e:
            print(e)
        finally:
            self.action_lock.release()

    def close(self):
        self.sock_.close()

客户端逻辑

建立与服务器的连接

使用socket.create_connection完成建立,使用json.loads完成数据的json化。在事件处理后,统计执行时间,再回传server。这个比较简单,也是最基础的用法。就不多解释了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def connect_to_server():
    global socket_
    socket_ = socket.create_connection((SERVER_ADDR, PORT))
    while True:
        try:
            bs = socket_.recv(1024)
            st_time = time.time()
            data = json.loads(bs.decode(encoding="utf-8"))
            ty = data['type']
            detail = data['detail']
            res = {}
            if ty == 1:
                # 鼠标事件
关键点>>>        res = handle_mouse_data(detail)
            elif ty == 0:
关键点>>>        res = handle_keyboard_data(detail)
            elapsed = time.time() - st_time
            res['elapsed'] = elapsed
            res['client_name'] = client_name
            socket_.send(json.dumps(res).encode())
        except Exception as e:
            print(e)

解析/模拟鼠标/键盘事件

pynput的Controller用于控制。看完API后,我真的大呼,封装的太好了,完全不用考虑x11, windows, macOS桌面事件差异问题!直接一个函数调用解决问题。直接上代码!

声明代码:

1
2
mouse_controller = pynput.mouse.Controller()
keyboard_controller = pynput.keyboard.Controller()

模拟代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def handle_mouse_data(data: dict) -> dict:
    type = data['type']
    if type == "move":
        mouse_controller.move(data['x'], data['y'])
    elif type == "scroll":
        mouse_controller.scroll(data['dx'], data['dy'])
    elif type == "click":
        mouse_controller.click(eval(data['btn']))
    return data


def handle_keyboard_data(data: dict) -> dict:
    if data["action"] == "press":
        key = data["key"]
        if type(key) == str:
            if key.startswith('Key.'):
                keyboard_controller.press(eval(key))
            else:
                keyboard_controller.press(key)
        else:
            print(f"error key {key}")
    elif data["action"] == "release":
        key = data["key"]
        if type(key) == str:
            if key.startswith('Key.'):
                keyboard_controller.release(eval(key))
            else:
                keyboard_controller.release(key)
        else:
            print(f"error key {key}")
    return data

以下是效果图,使用虚拟机完成多计算机的模拟。

在主机敲入This is the test page后,虚拟机也模拟主机在终端敲入了相同的This is the test page。移动主机鼠标时,虚拟机内的光标同样朝着相同方向移动。

至此一个简单的无界面远控复读机软件完成了。

细节优化

数据写入sqlite问题

python下sqlite只允许在当前connect的线程下进行操作。可以模拟消息队列,sqlite线程定时拉取queue中的消息完成数据库存入。并且这样做还有个好处,可以决定commit时机,优化数据库写入速度。

并发下变量访问问题

通过threading的Lock实现单变量多线程互斥访问。