qop1830 发表于 2024-2-24 21:23:24

hook PC微信发送文本消息~

class DebugWindow:

    def __init__(self, message_queue):
      self.root = tk.Tk()
      self.root.title("Debug Window")

      # 创建一个自定义字体
      custom_font = font.Font(family="Helvetica", size=20)# 设置字体为Helvetica,大小为12

      self.text_widget = tk.Text(self.root, wrap=tk.WORD, bg="#2B2B2B", fg="#cccccc", font=custom_font)

      self.text_widget.pack()

      self.message_queue = message_queue

    def run(self):
      self._update()

    def _update(self):
      while self.root and self.root.winfo_exists():
            try:
                debug_message, color = self.message_queue.get_nowait()
                # 向主线程发送更新 GUI 的消息
                self.root.event_generate("<<UpdateDebugInfo>>", when="tail")
                # 在主线程中更新 GUI
                self.root.after(0, lambda msg=debug_message, c=color: self.add_debug_info(msg, c))
            except queue.Empty:
                pass

            self.root.update()
            self.root.after(100)# 延迟以控制更新率

    def on_close(self):
      # 窗口关闭时,向消息队列发送终止信号
      self.message_queue.put((None, None))
      self.root.destroy()

    def add_debug_info(self, debug_str, color):
      self.text_widget.tag_configure(color, foreground=color)
      self.text_widget.insert(tk.END, debug_str, color)
      self.text_widget.insert(tk.END, "\n")


def send_debug_message(debug_str, str_color="#cccccc"):
    message_queue.put((debug_str, str_color))


def debug_message_consumer(message_queue):
    debug_window = DebugWindow(message_queue)
    if debug_window.root:
      debug_window.run()
      # 主循环结束后,向消息队列发送终止信号
      message_queue.put((None, None))


try:
    import ctypes, os, json, re, time, threading, queue
    import tkinter as tk
    from tkinter import font


    class Offsets:
      # Version:3.9.9.43
      ctypes.windll.kernel32.GetModuleHandleW.restype = ctypes.c_void_p
      WeChatWinBase = ctypes.windll.kernel32.GetModuleHandleW('WeChatWin.dll')
      GetContactMgr = 0xA69FD0         # 49 8d 77 ?? 44 39 ?? ?? 0f 8e ?? ?? ?? ?? e8+15
      GetContactList = 0x10B8420       # 48 8d 54 24 ?? 48 8b c8 e8 ?? ?? ?? ?? ff 15+9 字符串搜索 "ContactMgr::getList"
      GetSendMessageMgr = 0xA7C730   # 81 a5 ?? ?? ?? ?? ?? ?? ?? ?? e8 ?? ?? ?? ?? 48 8d 55 ?? 48 8b c8 e8 + 11
      SendTextMsg = 0x11DE090          # 4d 8d 47 ?? 48 8b d3 48 8d 4d ?? e8 ?? ?? ?? ?? 48 8d 4d ?? e8 ?? ?? ?? ?? 48 83 c3 +12
      FreeChatMsg = 0xA7DFB0         # 4d 8d 47 ?? 48 8b d3 48 8d 4d ?? e8 ?? ?? ?? ?? 48 8d 4d ?? e8 ?? ?? ?? ?? 48 83 c3 +21
      ContactsField = {
            "wxid": 0x10,
            "custom_account": 0x30,
            "encrypt_name": 0x50,
            "remark_name": 0x80,
            "verify_flag": 0x70,
            "type": 0x74,
            "nick_name": 0xa0,
            "pinyin": 0x108,
            "pinyin_all": 0x128,
            "reserved1": 0x1f0,
            "reserved2": 0x1f4
      }


    class WeChatString(ctypes.Structure):
      _fields_ = [
            ("ptr", ctypes.POINTER(ctypes.c_wchar)),
            ("length", ctypes.c_ulong),
            ("max_length", ctypes.c_ulong),
            ("c_ptr", ctypes.c_int64),
            ("c_len", ctypes.c_ulong),
      ]

      def __init__(self, s=None):
            super().__init__()
            if s is not None:
                self.ptr = ctypes.create_unicode_buffer(s)
                self.length = len(s)
                self.max_length = len(s)


    class Manager:
      def __init__(self, base_addr):
            self.base_addr = base_addr
            self.friend_info = {}
            self.get_contacts()

      def SendTextToWxid(self, wxid, msg):
            success = -1

            send_message_mgr_addr = self.base_addr + Offsets.GetSendMessageMgr
            send_text_msg_addr = self.base_addr + Offsets.SendTextMsg
            free_chat_msg_addr = self.base_addr + Offsets.FreeChatMsg

            chat_msg = ctypes.create_string_buffer(0x460)
            to_user = WeChatString(wxid)
            text_msg = WeChatString(msg)
            temp = (ctypes.c_uint64 * 3)()

            get_send_message_mgr = ctypes.CFUNCTYPE(ctypes.c_uint64)(send_message_mgr_addr)
            send_text_msg = ctypes.CFUNCTYPE(ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64)(send_text_msg_addr)
            free_chat_msg = ctypes.CFUNCTYPE(ctypes.c_uint64, ctypes.c_uint64)(free_chat_msg_addr)

            get_send_message_mgr()

            send_text_msg(ctypes.addressof(chat_msg), ctypes.addressof(to_user), ctypes.addressof(text_msg), ctypes.addressof(temp), 1, 1, 0, 0, )

            free_chat_msg(ctypes.addressof(chat_msg))
            success = 1
            return success

      @staticmethod
      def read_unicode_string(address):
            if address == 0:
                return ''

            buffer = bytearray()
            length = 0

            try:
                while True:

                  data = ctypes.c_int16.from_address(address + length).value
                  buffer.extend(data.to_bytes(2, byteorder='little', signed=True))
                  length += 2
                  if data == 0:
                        break
            except:
                pass

            return buffer.decode('utf-16-le', errors='ignore').rstrip('\x00')

      def GetFriendByNick(self, nick):
            for wxid in self.friend_info:
                friend = self.friend_info
                if friend['昵称'] == nick or re.match('^' + nick + '$', friend['昵称']):
                  return friend
            return None

      def GetFriendByRemark(self, remark):
            for wxid in self.friend_info:
                friend = self.friend_info
                if friend['备注'] == remark or re.match('^' + remark + '$', friend['备注']):
                  return friend
            return None

      def SendTextToNick(self, tonick: str, content: str):
            friend = self.GetFriendByNick(tonick)
            if friend:
                return self.SendTextToWxid(friend['wxid'], content)
            return False

      def SendTextToRemark(self, toRemark: str, content: str):
            friend = self.GetFriendByRemark(toRemark)
            if friend:
                return self.SendTextToWxid(friend['wxid'], content)
            return False

      def get_contacts(self):
            success = -1
            get_contact_mgr_addr = self.base_addr + Offsets.GetContactMgr
            get_contact_list_addr = self.base_addr + Offsets.GetContactList
            get_contact_mgr = ctypes.CFUNCTYPE(ctypes.c_uint64)(get_contact_mgr_addr)
            get_contact_list = (ctypes.CFUNCTYPE(ctypes.c_int64, ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint64))
                              (get_contact_list_addr))
            mgr = get_contact_mgr()
            contact_vec = (ctypes.c_uint64 * 3)(0, 0, 0)
            success = get_contact_list(mgr, contact_vec)
            start = contact_vec
            end = contact_vec
            friend_info = {}
            while start < end:
                wxid = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["wxid"]).value)
                custom_account = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["custom_account"]).value)
                # encrypt_name = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["encrypt_name"]).value)
                remark_name = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["remark_name"]).value)
                nickname = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["nick_name"]).value)
                # pinyin = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["pinyin"]).value)
                # pinyin_all = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["pinyin_all"]).value)
                # verify_flag = ctypes.c_uint32.from_address(start + Offsets.ContactsField["verify_flag"]).value
                # type = ctypes.c_uint32.from_address(start + Offsets.ContactsField["type"]).value
                # reserved1 = ctypes.c_uint32.from_address(start + Offsets.ContactsField["reserved1"]).value
                # reserved2 = ctypes.c_uint32.from_address(start + Offsets.ContactsField["reserved2"]).value
                friend_info = {"wxid": wxid, "微信号": custom_account, "昵称": nickname, "备注": remark_name}
                start += 0x6A8
            self.friend_info = friend_info
            return self.friend_info

      def send_message(self, emp_code, test_x, prefix):
            if emp_code == '9999-陈佩林':
                emp_code = '樱桃-陈佩林'
                test_x = test_x.replace('9999-陈佩林', '樱桃-陈佩林')
            send_error = self.SendTextToRemark(prefix + emp_code, test_x)
            if send_error:
                return emp_code, True
            else:
                return emp_code, False



    message_queue = queue.Queue()
    # 创建一个线程用于运行 Tkinter 主循环
    tk_thread = threading.Thread(target=debug_message_consumer, args=(message_queue,), daemon=True)
    tk_thread.start()
    manager = Manager(Offsets.WeChatWinBase)

    # 在主线程中添加不同颜色的调试信息
    # send_debug_message("Debug message 1: custom_color")

    test_x_list = [
      "\n".join([
                      f"开始:{start_date_str}",
                      f"截至:{end_date_str}",
                      f"★工 号: {item['EmpCodeWithName']}"
                  ] +
                  ) + '\n' + '-' * 100
      for item in send_data_list
    ]

    send_error_list = []
    send_success_list = []
    results = , test_x.rstrip('-').rstrip('\n'), prefix) for item, test_x in
               zip(send_data_list, test_x_list)]
    for emp_code, error in results:
      if error:
            send_success_list.append(emp_code)
      else:
            send_error_list.append(emp_code)
    print_str = ""
    if send_success_list:
      send_debug_message('发送成功:' + str(len(send_success_list)) + str(send_success_list), "#20BE67")
      pass
    if send_error_list:
      send_debug_message('发送失败:' + str(len(send_error_list)) + str(send_error_list), "#F59045")
      pass

    tk_thread.join()
except Exception as e:
    send_debug_message("error:%s " % str(e), "#F59045")
    pass



teq 发表于 2024-7-9 18:08:07

标注一下,慢慢学习

dgmax 发表于 2024-8-1 23:57:54

这个利害了。学习学习
页: [1]
查看完整版本: hook PC微信发送文本消息~