找回密码
 加入
搜索
查看: 1046|回复: 2

hook PC微信发送文本消息~

[复制链接]
发表于 2024-2-24 21:23:24 | 显示全部楼层 |阅读模式
class DebugWindow:

    def __init__(self, message_queue):
        self.root = tk.Tk()
        self.root.title("Debug Window")

        # 创建一个自定义字体
        custom_font = font.Font(family="Helvetica", size=20)  # 设置字体为Helvetica,大小为12

        self.text_widget = tk.Text(self.root, wrap=tk.WORD, bg="#2B2B2B", fg="#cccccc", font=custom_font)

        self.text_widget.pack()

        self.message_queue = message_queue

    def run(self):
        self._update()

    def _update(self):
        while self.root and self.root.winfo_exists():
            try:
                debug_message, color = self.message_queue.get_nowait()
                # 向主线程发送更新 GUI 的消息
                self.root.event_generate("<<UpdateDebugInfo>>", when="tail")
                # 在主线程中更新 GUI
                self.root.after(0, lambda msg=debug_message, c=color: self.add_debug_info(msg, c))
            except queue.Empty:
                pass

            self.root.update()
            self.root.after(100)  # 延迟以控制更新率

    def on_close(self):
        # 窗口关闭时,向消息队列发送终止信号
        self.message_queue.put((None, None))
        self.root.destroy()

    def add_debug_info(self, debug_str, color):
        self.text_widget.tag_configure(color, foreground=color)
        self.text_widget.insert(tk.END, debug_str, color)
        self.text_widget.insert(tk.END, "\n")


def send_debug_message(debug_str, str_color="#cccccc"):
    message_queue.put((debug_str, str_color))


def debug_message_consumer(message_queue):
    debug_window = DebugWindow(message_queue)
    if debug_window.root:
        debug_window.run()
        # 主循环结束后,向消息队列发送终止信号
        message_queue.put((None, None))


try:
    import ctypes, os, json, re, time, threading, queue
    import tkinter as tk
    from tkinter import font


    class Offsets:
        # Version:3.9.9.43
        ctypes.windll.kernel32.GetModuleHandleW.restype = ctypes.c_void_p
        WeChatWinBase = ctypes.windll.kernel32.GetModuleHandleW('WeChatWin.dll')
        GetContactMgr = 0xA69FD0         # 49 8d 77 ?? 44 39 ?? ?? 0f 8e ?? ?? ?? ?? e8+15
        GetContactList = 0x10B8420       # 48 8d 54 24 ?? 48 8b c8 e8 ?? ?? ?? ?? ff 15+9 字符串搜索 "ContactMgr::getList"
        GetSendMessageMgr = 0xA7C730     # 81 a5 ?? ?? ?? ?? ?? ?? ?? ?? e8 ?? ?? ?? ?? 48 8d 55 ?? 48 8b c8 e8 + 11
        SendTextMsg = 0x11DE090          # 4d 8d 47 ?? 48 8b d3 48 8d 4d ?? e8 ?? ?? ?? ?? 48 8d 4d ?? e8 ?? ?? ?? ?? 48 83 c3 +12
        FreeChatMsg = 0xA7DFB0           # 4d 8d 47 ?? 48 8b d3 48 8d 4d ?? e8 ?? ?? ?? ?? 48 8d 4d ?? e8 ?? ?? ?? ?? 48 83 c3 +21
        ContactsField = {
            "wxid": 0x10,
            "custom_account": 0x30,
            "encrypt_name": 0x50,
            "remark_name": 0x80,
            "verify_flag": 0x70,
            "type": 0x74,
            "nick_name": 0xa0,
            "pinyin": 0x108,
            "pinyin_all": 0x128,
            "reserved1": 0x1f0,
            "reserved2": 0x1f4
        }


    class WeChatString(ctypes.Structure):
        _fields_ = [
            ("ptr", ctypes.POINTER(ctypes.c_wchar)),
            ("length", ctypes.c_ulong),
            ("max_length", ctypes.c_ulong),
            ("c_ptr", ctypes.c_int64),
            ("c_len", ctypes.c_ulong),
        ]

        def __init__(self, s=None):
            super().__init__()
            if s is not None:
                self.ptr = ctypes.create_unicode_buffer(s)
                self.length = len(s)
                self.max_length = len(s)


    class Manager:
        def __init__(self, base_addr):
            self.base_addr = base_addr
            self.friend_info = {}
            self.get_contacts()

        def SendTextToWxid(self, wxid, msg):
            success = -1

            send_message_mgr_addr = self.base_addr + Offsets.GetSendMessageMgr
            send_text_msg_addr = self.base_addr + Offsets.SendTextMsg
            free_chat_msg_addr = self.base_addr + Offsets.FreeChatMsg

            chat_msg = ctypes.create_string_buffer(0x460)
            to_user = WeChatString(wxid)
            text_msg = WeChatString(msg)
            temp = (ctypes.c_uint64 * 3)()

            get_send_message_mgr = ctypes.CFUNCTYPE(ctypes.c_uint64)(send_message_mgr_addr)
            send_text_msg = ctypes.CFUNCTYPE(ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64)(send_text_msg_addr)
            free_chat_msg = ctypes.CFUNCTYPE(ctypes.c_uint64, ctypes.c_uint64)(free_chat_msg_addr)

            get_send_message_mgr()

            send_text_msg(ctypes.addressof(chat_msg), ctypes.addressof(to_user), ctypes.addressof(text_msg), ctypes.addressof(temp), 1, 1, 0, 0, )

            free_chat_msg(ctypes.addressof(chat_msg))
            success = 1
            return success

        @staticmethod
        def read_unicode_string(address):
            if address == 0:
                return ''

            buffer = bytearray()
            length = 0

            try:
                while True:

                    data = ctypes.c_int16.from_address(address + length).value
                    buffer.extend(data.to_bytes(2, byteorder='little', signed=True))
                    length += 2
                    if data == 0:
                        break
            except:
                pass

            return buffer.decode('utf-16-le', errors='ignore').rstrip('\x00')

        def GetFriendByNick(self, nick):
            for wxid in self.friend_info:
                friend = self.friend_info[wxid]
                if friend['昵称'] == nick or re.match('^' + nick + '$', friend['昵称']):
                    return friend
            return None

        def GetFriendByRemark(self, remark):
            for wxid in self.friend_info:
                friend = self.friend_info[wxid]
                if friend['备注'] == remark or re.match('^' + remark + '$', friend['备注']):
                    return friend
            return None

        def SendTextToNick(self, tonick: str, content: str):
            friend = self.GetFriendByNick(tonick)
            if friend:
                return self.SendTextToWxid(friend['wxid'], content)
            return False

        def SendTextToRemark(self, toRemark: str, content: str):
            friend = self.GetFriendByRemark(toRemark)
            if friend:
                return self.SendTextToWxid(friend['wxid'], content)
            return False

        def get_contacts(self):
            success = -1
            get_contact_mgr_addr = self.base_addr + Offsets.GetContactMgr
            get_contact_list_addr = self.base_addr + Offsets.GetContactList
            get_contact_mgr = ctypes.CFUNCTYPE(ctypes.c_uint64)(get_contact_mgr_addr)
            get_contact_list = (ctypes.CFUNCTYPE(ctypes.c_int64, ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint64))
                                (get_contact_list_addr))
            mgr = get_contact_mgr()
            contact_vec = (ctypes.c_uint64 * 3)(0, 0, 0)
            success = get_contact_list(mgr, contact_vec)
            start = contact_vec[0]
            end = contact_vec[2]
            friend_info = {}
            while start < end:
                wxid = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["wxid"]).value)
                custom_account = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["custom_account"]).value)
                # encrypt_name = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["encrypt_name"]).value)
                remark_name = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["remark_name"]).value)
                nickname = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["nick_name"]).value)
                # pinyin = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["pinyin"]).value)
                # pinyin_all = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["pinyin_all"]).value)
                # verify_flag = ctypes.c_uint32.from_address(start + Offsets.ContactsField["verify_flag"]).value
                # type = ctypes.c_uint32.from_address(start + Offsets.ContactsField["type"]).value
                # reserved1 = ctypes.c_uint32.from_address(start + Offsets.ContactsField["reserved1"]).value
                # reserved2 = ctypes.c_uint32.from_address(start + Offsets.ContactsField["reserved2"]).value
                friend_info[wxid] = {"wxid": wxid, "微信号": custom_account, "昵称": nickname, "备注": remark_name}
                start += 0x6A8
            self.friend_info = friend_info
            return self.friend_info

        def send_message(self, emp_code, test_x, prefix):
            if emp_code == '9999-陈佩林':
                emp_code = '樱桃-陈佩林'
                test_x = test_x.replace('9999-陈佩林', '樱桃-陈佩林')
            send_error = self.SendTextToRemark(prefix + emp_code, test_x)
            if send_error:
                return emp_code, True
            else:
                return emp_code, False



    message_queue = queue.Queue()
    # 创建一个线程用于运行 Tkinter 主循环
    tk_thread = threading.Thread(target=debug_message_consumer, args=(message_queue,), daemon=True)
    tk_thread.start()
    manager = Manager(Offsets.WeChatWinBase)

    # 在主线程中添加不同颜色的调试信息
    # send_debug_message("Debug message 1: custom_color")

    test_x_list = [
        "\n".join([
                      f"开始:{start_date_str}",
                      f"截至:{end_date_str}",
                      f"★工 号: {item['EmpCodeWithName']}"
                  ] + [f"★{key}: {value}" for key, value in item.items() if key != 'EmpCodeWithName']
                  ) + '\n' + '-' * 100
        for item in send_data_list
    ]

    send_error_list = []
    send_success_list = []
    results = [manager.send_message(item['EmpCodeWithName'], test_x.rstrip('-').rstrip('\n'), prefix) for item, test_x in
               zip(send_data_list, test_x_list)]
    for emp_code, error in results:
        if error:
            send_success_list.append(emp_code)
        else:
            send_error_list.append(emp_code)
    print_str = ""
    if send_success_list:
        send_debug_message('发送成功:' + str(len(send_success_list)) + str(send_success_list), "#20BE67")
        pass
    if send_error_list:
        send_debug_message('发送失败:' + str(len(send_error_list)) + str(send_error_list), "#F59045")
        pass

    tk_thread.join()
except Exception as e:
    send_debug_message("error:%s " % str(e), "#F59045")
    pass

发表于 2024-7-9 18:08:07 | 显示全部楼层
标注一下,慢慢学习
发表于 2024-8-1 23:57:54 | 显示全部楼层
这个利害了。学习学习
您需要登录后才可以回帖 登录 | 加入

本版积分规则

QQ|手机版|小黑屋|AUTOIT CN ( 鲁ICP备19019924号-1 )谷歌 百度

GMT+8, 2024-11-21 14:21 , Processed in 0.071831 second(s), 20 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表