notify_win/client/notification_client.py
2025-02-28 15:11:02 +08:00

693 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import websockets
import json
import pystray
from PIL import Image
import sys
import yaml
import logging
from datetime import datetime
import os
import argparse
import logging.handlers
from collections import deque, defaultdict, Counter
from winotify import Notification # 添加到文件开头的导入部分
import tkinter as tk
from tkinter import ttk
import threading
try:
from win10toast import ToastNotifier
USE_WIN10TOAST = True
toaster = ToastNotifier()
except ImportError:
USE_WIN10TOAST = False
# 设置日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.handlers.RotatingFileHandler(
'notification_client.log',
maxBytes=1024*1024, # 1MB
backupCount=5,
encoding='utf-8'
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler(file_handler)
# 添加自定义通知框类
class CustomNotificationWindow:
def __init__(self, title, message, duration=3000, position=None, client=None):
# 保存客户端引用和位置信息
self.client = client
self.position = position
# 确保在主线程中运行 tkinter
if threading.current_thread() is not threading.main_thread():
return
self.window = tk.Tk()
self.window.withdraw() # 先隐藏窗口
# 设置窗口样式
self.window.overrideredirect(True)
self.window.attributes('-topmost', True)
self.window.configure(bg='#f0f0f0')
# 创建主框架
self.frame = tk.Frame(self.window, bg='#f0f0f0')
self.frame.pack(fill='both', expand=True)
# 创建标题栏框架
self.title_frame = tk.Frame(self.frame, bg='#f0f0f0', cursor='hand2')
self.title_frame.pack(fill='x', expand=True, padx=10, pady=(10,0))
# 标题
self.title_label = tk.Label(self.title_frame,
text=title,
font=('Arial', 10, 'bold'),
bg='#f0f0f0',
fg='#333333')
self.title_label.pack(side='left', anchor='w')
# 关闭按钮
self.close_button = tk.Label(self.title_frame,
text='×',
font=('Arial', 12, 'bold'),
bg='#f0f0f0',
fg='#666666',
cursor='hand2',
padx=5)
self.close_button.pack(side='right', anchor='e')
# 消息内容框架
self.message_frame = tk.Frame(self.frame, bg='#f0f0f0')
self.message_frame.pack(fill='both', expand=True, padx=10, pady=10)
# 消息内容
self.message_label = tk.Label(self.message_frame,
text=message,
wraplength=300,
justify='left',
bg='#f0f0f0',
fg='#666666')
self.message_label.pack(anchor='w')
# 获取屏幕尺寸
screen_width = self.window.winfo_screenwidth()
screen_height = self.window.winfo_screenheight()
# 设置窗口大小和位置
self.window.update_idletasks()
self.width = self.window.winfo_width()
self.height = self.window.winfo_height()
# 如果有指定位置就使用指定位置,否则使用默认位置
if self.position:
x, y = self.position
else:
x = screen_width - self.width - 20
y = screen_height - self.height - 40
# 调整Y坐标以避免重叠
if self.client:
y = self.client.get_next_notification_y(self.height)
self.window.geometry(f'+{x}+{y}')
# 将自己添加到客户端的通知窗口列表
if self.client:
self.client.notification_windows.append(self)
# 绑定事件
self._bind_events()
# 显示窗口
self.window.deiconify()
# 运行主循环
self.window.mainloop()
def _bind_events(self):
"""绑定所有事件"""
# 关闭按钮事件
self.close_button.bind('<Button-1>', lambda e: self.close())
self.close_button.bind('<Enter>',
lambda e: self.close_button.configure(fg='#ff0000'))
self.close_button.bind('<Leave>',
lambda e: self.close_button.configure(fg='#666666'))
# 拖动事件
self.title_frame.bind('<Button-1>', self.start_move)
self.title_frame.bind('<B1-Motion>', self.on_move)
self.title_label.bind('<Button-1>', self.start_move)
self.title_label.bind('<B1-Motion>', self.on_move)
# 窗口事件
self.window.bind('<Escape>', lambda e: self.close())
def start_move(self, event):
"""开始拖动窗口"""
self._drag_start_x = event.x_root - self.window.winfo_x()
self._drag_start_y = event.y_root - self.window.winfo_y()
def on_move(self, event):
"""处理窗口拖动"""
x = event.x_root - self._drag_start_x
y = event.y_root - self._drag_start_y
self.window.geometry(f'+{x}+{y}')
# 更新客户端中记录的位置
if self.client:
self.client.last_notification_pos = (x, y)
def close(self, event=None):
"""关闭窗口"""
try:
if self.window:
# 保存最后的位置
if self.client:
self.client.last_notification_pos = (
self.window.winfo_x(),
self.window.winfo_y()
)
# 从通知窗口列表中移除自己
if self in self.client.notification_windows:
self.client.notification_windows.remove(self)
# 重新排列其他通知窗口
self.client.rearrange_notifications()
self.window.destroy()
self.window = None
except Exception as e:
print(f"关闭窗口时出错: {e}")
class NotificationClient:
def __init__(self, config_path='config.yml'):
self.logger = logger # 先初始化logger
self.config_path = config_path
self.config_mtime = 0
self.load_config()
self.running = True
self.key_file = 'client_key.txt'
self.key = self.load_key()
self.connected = False
self.status_text = "未连接" # 添加状态文本变量
self.status_item = None
self.history_file = 'notification_history.json'
self.history = deque(maxlen=100) # 最多保存100条记录
self.load_history()
self.notification_filters = []
self.notification_groups = defaultdict(list)
self.notification_stats = {
'total': 0,
'by_type': Counter(),
'by_hour': Counter(),
'filtered': 0
}
self.notification_mode = 'system' # 默认使用系统通知
self.notification_windows = [] # 存储当前显示的通知窗口
self.last_notification_pos = None # 记录最后一个通知的位置
self.notification_spacing = 10 # 通知窗口之间的间距
def load_config(self):
"""加载配置文件"""
try:
if os.path.exists(self.config_path):
mtime = os.path.getmtime(self.config_path)
if mtime > self.config_mtime:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
self.config_mtime = mtime
self.logger.info("配置已重新加载")
return True
else:
# 如果配置文件不存在,创建默认配置
self.config = {
'server': {
'host': '127.0.0.1',
'port': 8080
},
'client': {
'device_name': '我的设备',
'app_id': 'Python.Notification'
},
'security': {
'use_ssl': False
},
'notification': {
'duration': {
'info': "short",
'warning': "long",
'error': "long"
},
'sounds': {
'enabled': True,
'info': "default",
'warning': "warning.wav",
'error': "error.wav"
}
}
}
# 保存默认配置
with open(self.config_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(self.config, f, allow_unicode=True)
self.logger.info("已创建默认配置文件")
return True
except Exception as e:
self.logger.error(f"加载配置失败: {e}")
return False
def load_key(self):
"""从文件加载密钥"""
try:
if os.path.exists(self.key_file):
with open(self.key_file, 'r', encoding='utf-8') as f:
return f.read().strip()
except Exception as e:
self.logger.error(f"加载密钥失败: {e}")
return None
def save_key(self, key):
"""保存密钥到文件"""
try:
with open(self.key_file, 'w', encoding='utf-8') as f:
f.write(key)
except Exception as e:
self.logger.error(f"保存密钥失败: {e}")
def load_history(self):
"""加载通知历史"""
try:
if os.path.exists(self.history_file):
with open(self.history_file, 'r', encoding='utf-8') as f:
self.history = deque(json.load(f), maxlen=100)
except Exception as e:
self.logger.error(f"加载历史记录失败: {e}")
def save_history(self):
"""保存通知历史"""
try:
with open(self.history_file, 'w', encoding='utf-8') as f:
json.dump(list(self.history), f, ensure_ascii=False, indent=2)
except Exception as e:
self.logger.error(f"保存历史记录失败: {e}")
def add_filter(self, filter_func):
"""添加通知过滤器"""
self.notification_filters.append(filter_func)
def should_show_notification(self, title, message, notification_type):
"""检查是否应该显示通知"""
for filter_func in self.notification_filters:
if not filter_func(title, message, notification_type):
return False
return True
def group_notification(self, title, message, notification_type):
"""对通知进行分组"""
group_key = title # 可以根据需要修改分组规则
self.notification_groups[group_key].append({
'message': message,
'type': notification_type,
'time': datetime.now()
})
# 如果同一组有多条消息,合并显示
if len(self.notification_groups[group_key]) > 1:
messages = [n['message'] for n in self.notification_groups[group_key][-5:]]
return f"{message}\n\n还有 {len(messages)-1} 条相似消息"
return message
def show_custom_notification(self, title, message, duration):
"""在主线程中显示自定义通知"""
if hasattr(self, 'icon') and self.icon:
# 如果在主线程中,直接创建通知窗口
CustomNotificationWindow(title, message, duration,
position=self.last_notification_pos,
client=self)
else:
# 如果不在主线程中,使用新线程创建通知窗口
self.logger.info("在主线程中创建通知窗口")
threading.Thread(target=lambda: CustomNotificationWindow(
title, message, duration,
position=self.last_notification_pos,
client=self
), daemon=True).start()
def show_notification(self, title, message, notification_type='info'):
try:
self.logger.info(f"准备显示通知: {title} - {message} (类型: {notification_type})")
# 更新统计和历史记录
self.update_stats(notification_type)
self.history.append({
'timestamp': datetime.now().isoformat(),
'title': title,
'message': message,
'type': notification_type
})
self.save_history()
if self.notification_mode == 'system':
# 使用系统通知
duration_type = self.config['notification']['default_duration']
duration = "long" if duration_type == "long" else "short"
notification = Notification(
app_id="Windows App",
title=title,
msg=message,
duration=duration
)
notification.show()
self.logger.info(f"系统通知已发送 (持续时间: {duration})")
else:
# 使用自定义通知框
duration_ms = 10000 if self.config['notification']['default_duration'] == 'long' else 3000
self.show_custom_notification(title, message, duration_ms)
self.logger.info("自定义通知已发送")
except Exception as e:
self.logger.error(f"通知处理失败: {e}")
self.logger.error(f"错误类型: {type(e)}")
import traceback
self.logger.error(f"详细错误: {traceback.format_exc()}")
async def connect_websocket(self):
retry_delays = [5, 10, 30, 60] # 重试间隔(秒)
retry_count = 0
while self.running:
try:
self.update_status("正在连接...", False)
ws_url = f"{'wss' if self.config['security']['use_ssl'] else 'ws'}://{self.config['server']['host']}:{self.config['server']['port']}/ws"
self.logger.info(f"正在连接到服务器: {ws_url}")
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as websocket:
if self.key:
self.logger.info("使用已有密钥连接")
await websocket.send(json.dumps({
'action': 'connect',
'key': self.key
}))
else:
self.logger.info("注册新客户端")
await websocket.send(json.dumps({
'action': 'register',
'device_name': self.config['client']['device_name']
}))
response = await websocket.recv()
data = json.loads(response)
self.logger.info(f"收到服务器响应: {data}")
if data.get('status') == 'success':
self.update_status("已连接", True)
if 'key' in data:
self.key = data['key']
self.save_key(self.key)
self.logger.info(f"获取到新密钥: {self.key[:8]}...")
self.show_notification(
"设备注册成功",
f"你的推送密钥: {self.key}\n请保管好此密钥"
)
self.logger.info("已连接到服务器,等待消息...")
retry_count = 0 # 连接成功后重置重试计数
# 处理消息
while self.running:
try:
message = await websocket.recv()
if message == "ping":
await websocket.send("pong")
continue
elif message == "pong":
continue
data = json.loads(message)
self.logger.info(f"收到新消息: {data}")
self.show_notification(
data["title"],
data["message"],
data.get("type", "info")
)
except websockets.exceptions.ConnectionClosed:
break
else:
self.logger.error(f"连接失败: {data.get('message')}")
if self.key:
self.key = None
if os.path.exists(self.key_file):
os.remove(self.key_file)
except Exception as e:
self.update_status(f"连接错误: {str(e)}", False)
# 计算重试延迟
delay = retry_delays[min(retry_count, len(retry_delays)-1)]
self.logger.info(f"{delay} 秒后重试连接...")
await asyncio.sleep(delay)
retry_count += 1
def update_status(self, status, connected=None):
"""更新托盘图标状态"""
if connected is not None:
self.connected = connected
self.status_text = f"状态: {'已连接' if self.connected else '未连接'} - {status}"
# 重建菜单
if hasattr(self, 'icon') and self.icon:
self.icon.menu = self.create_menu()
def create_menu(self):
"""创建托盘菜单"""
duration_menu = pystray.Menu(
pystray.MenuItem("短 (3秒)",
lambda: self.set_notification_duration("short")),
pystray.MenuItem("长 (10秒)",
lambda: self.set_notification_duration("long"))
)
notification_mode_text = "切换到自定义通知" if self.notification_mode == 'system' else "切换到系统通知"
return pystray.Menu(
pystray.MenuItem(self.status_text, lambda: None, enabled=False),
pystray.MenuItem("复制密钥", self.copy_key_to_clipboard),
pystray.MenuItem("查看历史", self.show_history),
pystray.MenuItem("查看统计", self.show_stats),
pystray.MenuItem("通知持续时间", duration_menu),
pystray.MenuItem(notification_mode_text, self.toggle_notification_mode),
pystray.Menu.SEPARATOR,
pystray.MenuItem("重新连接", self.reconnect),
pystray.MenuItem("退出", self.exit_app)
)
def create_tray_icon(self):
try:
# 尝试加载自定义图标
icon_path = self.config['client'].get('icon_path')
if icon_path and os.path.exists(icon_path):
image = Image.open(icon_path)
else:
# 创建默认图标(使用渐变色而不是纯白)
image = Image.new('RGB', (64, 64))
for y in range(64):
for x in range(64):
image.putpixel((x, y), (x*4, y*4, 255))
self.icon = pystray.Icon(
"notification_client",
image,
"通知客户端",
self.create_menu()
)
return self.icon
except Exception as e:
self.logger.error(f"创建托盘图标失败: {e}")
# 创建一个简单的白色图标作为后备
image = Image.new('RGB', (64, 64), color='white')
menu = pystray.Menu(pystray.MenuItem("退出", self.exit_app))
self.icon = pystray.Icon("notification_client", image, "通知客户端", menu)
return self.icon
def copy_key_to_clipboard(self):
"""复制密钥到剪贴板"""
if self.key:
import pyperclip
pyperclip.copy(self.key)
self.show_notification("复制成功", "密钥已复制到剪贴板")
async def cleanup(self):
"""清理资源"""
self.running = False
self.save_history()
# 等待其他任务完成
await asyncio.sleep(1)
def exit_app(self, icon):
"""优雅退出"""
self.logger.info("正在关闭应用...")
# 运行清理
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.cleanup())
# 停止图标
icon.stop()
sys.exit(0)
def reconnect(self):
"""手动触发重连"""
if not self.connected:
self.logger.info("手动触发重连...")
# 创建新的事件循环来处理重连
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.connect_websocket())
async def config_watch(self):
"""监控配置文件变化"""
while self.running:
if self.load_config():
# 如果配置改变,可以在这里处理
pass
await asyncio.sleep(5) # 每5秒检查一次
def show_history(self):
"""显示通知历史"""
if self.history:
latest = list(self.history)[-5:] # 显示最近5条
message = "\n".join(
f"{item['timestamp'][:19]} - {item['title']}: {item['message']}"
for item in latest
)
self.show_notification("最近通知记录", message)
else:
self.show_notification("通知历史", "暂无通知记录")
def update_stats(self, notification_type, shown=True):
"""更新通知统计"""
if shown:
self.notification_stats['total'] += 1
self.notification_stats['by_type'][notification_type] += 1
hour = datetime.now().hour
self.notification_stats['by_hour'][hour] += 1
else:
self.notification_stats['filtered'] += 1
def show_stats(self):
"""显示通知统计"""
stats = (
f"总通知数: {self.notification_stats['total']}\n"
f"已过滤: {self.notification_stats['filtered']}\n\n"
f"类型统计:\n"
+ "\n".join(f"- {t}: {c}" for t, c in self.notification_stats['by_type'].most_common())
)
self.show_notification("通知统计", stats)
def set_notification_duration(self, duration_type):
"""设置通知持续时间"""
self.config['notification']['default_duration'] = duration_type
self.show_notification(
"设置已更新",
f"通知持续时间已设置为: {duration_type}"
)
# 保存配置
with open(self.config_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(self.config, f, allow_unicode=True)
def toggle_notification_mode(self):
"""切换通知模式"""
self.notification_mode = 'system' if self.notification_mode == 'custom' else 'custom'
mode_text = "系统通知" if self.notification_mode == 'system' else "自定义通知"
self.show_notification("通知模式已更改", f"已切换为{mode_text}")
def get_next_notification_y(self, window_height):
"""计算下一个通知窗口的Y坐标"""
# 创建一个临时的 Tk 实例来获取屏幕高度
root = tk.Tk()
root.withdraw() # 隐藏窗口
screen_height = root.winfo_screenheight()
root.destroy()
if not self.notification_windows:
# 如果没有其他通知,使用默认位置或最后记录的位置
if self.last_notification_pos:
return self.last_notification_pos[1]
return screen_height - window_height - 40
# 获取最下方通知窗口的位置
last_window = max(self.notification_windows,
key=lambda w: w.window.winfo_y())
return last_window.window.winfo_y() + last_window.height + self.notification_spacing
def rearrange_notifications(self):
"""重新排列所有通知窗口"""
if not self.notification_windows:
return
# 按Y坐标排序
windows = sorted(self.notification_windows,
key=lambda w: w.window.winfo_y())
# 重新设置位置
base_y = windows[0].window.winfo_y()
for i, win in enumerate(windows):
if i == 0:
continue # 保持第一个窗口位置不变
new_y = base_y + sum(w.height + self.notification_spacing
for w in windows[:i])
x = win.window.winfo_x()
win.window.geometry(f'+{x}+{new_y}')
def run(self):
# 创建并运行系统托盘图标
icon = self.create_tray_icon()
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 启动WebSocket客户端和配置监控
loop.create_task(self.connect_websocket())
loop.create_task(self.config_watch())
# 在新线程中运行事件循环
import threading
threading.Thread(target=lambda: loop.run_forever(), daemon=True).start()
# 运行系统托盘
icon.run()
def parse_args():
parser = argparse.ArgumentParser(description='Windows 通知客户端')
parser.add_argument('--config', default='config.yml', help='配置文件路径')
parser.add_argument('--server', help='服务器地址 (例如: localhost:8080)')
parser.add_argument('--device', help='设备名称')
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
client = NotificationClient(config_path=args.config)
# 命令行参数覆盖配置文件
if args.server:
host, port = args.server.split(':')
client.config['server']['host'] = host
client.config['server']['port'] = int(port)
if args.device:
client.config['client']['device_name'] = args.device
client.run()