hook PC微信发送文本消息~
class DebugWindow:def __init__(self, message_queue):
self.root = tk.Tk()
self.root.title("Debug Window")
# 创建一个自定义字体
custom_font = font.Font(family="Helvetica", size=20)# 设置字体为Helvetica,大小为12
self.text_widget = tk.Text(self.root, wrap=tk.WORD, bg="#2B2B2B", fg="#cccccc", font=custom_font)
self.text_widget.pack()
self.message_queue = message_queue
def run(self):
self._update()
def _update(self):
while self.root and self.root.winfo_exists():
try:
debug_message, color = self.message_queue.get_nowait()
# 向主线程发送更新 GUI 的消息
self.root.event_generate("<<UpdateDebugInfo>>", when="tail")
# 在主线程中更新 GUI
self.root.after(0, lambda msg=debug_message, c=color: self.add_debug_info(msg, c))
except queue.Empty:
pass
self.root.update()
self.root.after(100)# 延迟以控制更新率
def on_close(self):
# 窗口关闭时,向消息队列发送终止信号
self.message_queue.put((None, None))
self.root.destroy()
def add_debug_info(self, debug_str, color):
self.text_widget.tag_configure(color, foreground=color)
self.text_widget.insert(tk.END, debug_str, color)
self.text_widget.insert(tk.END, "\n")
def send_debug_message(debug_str, str_color="#cccccc"):
message_queue.put((debug_str, str_color))
def debug_message_consumer(message_queue):
debug_window = DebugWindow(message_queue)
if debug_window.root:
debug_window.run()
# 主循环结束后,向消息队列发送终止信号
message_queue.put((None, None))
try:
import ctypes, os, json, re, time, threading, queue
import tkinter as tk
from tkinter import font
class Offsets:
# Version:3.9.9.43
ctypes.windll.kernel32.GetModuleHandleW.restype = ctypes.c_void_p
WeChatWinBase = ctypes.windll.kernel32.GetModuleHandleW('WeChatWin.dll')
GetContactMgr = 0xA69FD0 # 49 8d 77 ?? 44 39 ?? ?? 0f 8e ?? ?? ?? ?? e8+15
GetContactList = 0x10B8420 # 48 8d 54 24 ?? 48 8b c8 e8 ?? ?? ?? ?? ff 15+9 字符串搜索 "ContactMgr::getList"
GetSendMessageMgr = 0xA7C730 # 81 a5 ?? ?? ?? ?? ?? ?? ?? ?? e8 ?? ?? ?? ?? 48 8d 55 ?? 48 8b c8 e8 + 11
SendTextMsg = 0x11DE090 # 4d 8d 47 ?? 48 8b d3 48 8d 4d ?? e8 ?? ?? ?? ?? 48 8d 4d ?? e8 ?? ?? ?? ?? 48 83 c3 +12
FreeChatMsg = 0xA7DFB0 # 4d 8d 47 ?? 48 8b d3 48 8d 4d ?? e8 ?? ?? ?? ?? 48 8d 4d ?? e8 ?? ?? ?? ?? 48 83 c3 +21
ContactsField = {
"wxid": 0x10,
"custom_account": 0x30,
"encrypt_name": 0x50,
"remark_name": 0x80,
"verify_flag": 0x70,
"type": 0x74,
"nick_name": 0xa0,
"pinyin": 0x108,
"pinyin_all": 0x128,
"reserved1": 0x1f0,
"reserved2": 0x1f4
}
class WeChatString(ctypes.Structure):
_fields_ = [
("ptr", ctypes.POINTER(ctypes.c_wchar)),
("length", ctypes.c_ulong),
("max_length", ctypes.c_ulong),
("c_ptr", ctypes.c_int64),
("c_len", ctypes.c_ulong),
]
def __init__(self, s=None):
super().__init__()
if s is not None:
self.ptr = ctypes.create_unicode_buffer(s)
self.length = len(s)
self.max_length = len(s)
class Manager:
def __init__(self, base_addr):
self.base_addr = base_addr
self.friend_info = {}
self.get_contacts()
def SendTextToWxid(self, wxid, msg):
success = -1
send_message_mgr_addr = self.base_addr + Offsets.GetSendMessageMgr
send_text_msg_addr = self.base_addr + Offsets.SendTextMsg
free_chat_msg_addr = self.base_addr + Offsets.FreeChatMsg
chat_msg = ctypes.create_string_buffer(0x460)
to_user = WeChatString(wxid)
text_msg = WeChatString(msg)
temp = (ctypes.c_uint64 * 3)()
get_send_message_mgr = ctypes.CFUNCTYPE(ctypes.c_uint64)(send_message_mgr_addr)
send_text_msg = ctypes.CFUNCTYPE(ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64, ctypes.c_uint64)(send_text_msg_addr)
free_chat_msg = ctypes.CFUNCTYPE(ctypes.c_uint64, ctypes.c_uint64)(free_chat_msg_addr)
get_send_message_mgr()
send_text_msg(ctypes.addressof(chat_msg), ctypes.addressof(to_user), ctypes.addressof(text_msg), ctypes.addressof(temp), 1, 1, 0, 0, )
free_chat_msg(ctypes.addressof(chat_msg))
success = 1
return success
@staticmethod
def read_unicode_string(address):
if address == 0:
return ''
buffer = bytearray()
length = 0
try:
while True:
data = ctypes.c_int16.from_address(address + length).value
buffer.extend(data.to_bytes(2, byteorder='little', signed=True))
length += 2
if data == 0:
break
except:
pass
return buffer.decode('utf-16-le', errors='ignore').rstrip('\x00')
def GetFriendByNick(self, nick):
for wxid in self.friend_info:
friend = self.friend_info
if friend['昵称'] == nick or re.match('^' + nick + '$', friend['昵称']):
return friend
return None
def GetFriendByRemark(self, remark):
for wxid in self.friend_info:
friend = self.friend_info
if friend['备注'] == remark or re.match('^' + remark + '$', friend['备注']):
return friend
return None
def SendTextToNick(self, tonick: str, content: str):
friend = self.GetFriendByNick(tonick)
if friend:
return self.SendTextToWxid(friend['wxid'], content)
return False
def SendTextToRemark(self, toRemark: str, content: str):
friend = self.GetFriendByRemark(toRemark)
if friend:
return self.SendTextToWxid(friend['wxid'], content)
return False
def get_contacts(self):
success = -1
get_contact_mgr_addr = self.base_addr + Offsets.GetContactMgr
get_contact_list_addr = self.base_addr + Offsets.GetContactList
get_contact_mgr = ctypes.CFUNCTYPE(ctypes.c_uint64)(get_contact_mgr_addr)
get_contact_list = (ctypes.CFUNCTYPE(ctypes.c_int64, ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint64))
(get_contact_list_addr))
mgr = get_contact_mgr()
contact_vec = (ctypes.c_uint64 * 3)(0, 0, 0)
success = get_contact_list(mgr, contact_vec)
start = contact_vec
end = contact_vec
friend_info = {}
while start < end:
wxid = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["wxid"]).value)
custom_account = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["custom_account"]).value)
# encrypt_name = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["encrypt_name"]).value)
remark_name = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["remark_name"]).value)
nickname = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["nick_name"]).value)
# pinyin = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["pinyin"]).value)
# pinyin_all = self.read_unicode_string(ctypes.c_ulonglong.from_address(start + Offsets.ContactsField["pinyin_all"]).value)
# verify_flag = ctypes.c_uint32.from_address(start + Offsets.ContactsField["verify_flag"]).value
# type = ctypes.c_uint32.from_address(start + Offsets.ContactsField["type"]).value
# reserved1 = ctypes.c_uint32.from_address(start + Offsets.ContactsField["reserved1"]).value
# reserved2 = ctypes.c_uint32.from_address(start + Offsets.ContactsField["reserved2"]).value
friend_info = {"wxid": wxid, "微信号": custom_account, "昵称": nickname, "备注": remark_name}
start += 0x6A8
self.friend_info = friend_info
return self.friend_info
def send_message(self, emp_code, test_x, prefix):
if emp_code == '9999-陈佩林':
emp_code = '樱桃-陈佩林'
test_x = test_x.replace('9999-陈佩林', '樱桃-陈佩林')
send_error = self.SendTextToRemark(prefix + emp_code, test_x)
if send_error:
return emp_code, True
else:
return emp_code, False
message_queue = queue.Queue()
# 创建一个线程用于运行 Tkinter 主循环
tk_thread = threading.Thread(target=debug_message_consumer, args=(message_queue,), daemon=True)
tk_thread.start()
manager = Manager(Offsets.WeChatWinBase)
# 在主线程中添加不同颜色的调试信息
# send_debug_message("Debug message 1: custom_color")
test_x_list = [
"\n".join([
f"开始:{start_date_str}",
f"截至:{end_date_str}",
f"★工 号: {item['EmpCodeWithName']}"
] +
) + '\n' + '-' * 100
for item in send_data_list
]
send_error_list = []
send_success_list = []
results = , test_x.rstrip('-').rstrip('\n'), prefix) for item, test_x in
zip(send_data_list, test_x_list)]
for emp_code, error in results:
if error:
send_success_list.append(emp_code)
else:
send_error_list.append(emp_code)
print_str = ""
if send_success_list:
send_debug_message('发送成功:' + str(len(send_success_list)) + str(send_success_list), "#20BE67")
pass
if send_error_list:
send_debug_message('发送失败:' + str(len(send_error_list)) + str(send_error_list), "#F59045")
pass
tk_thread.join()
except Exception as e:
send_debug_message("error:%s " % str(e), "#F59045")
pass
标注一下,慢慢学习 这个利害了。学习学习
页:
[1]