diff --git a/corsair_lcd_tool.py b/corsair_lcd_tool.py new file mode 100644 index 0000000..fd8cec6 --- /dev/null +++ b/corsair_lcd_tool.py @@ -0,0 +1,550 @@ +import logging +import os +import platform +import sys +from dataclasses import dataclass +import cv2 +import hid +import numpy as np +import yaml +from PyQt6.QtCore import Qt, QTimer, QPointF, pyqtSignal, QThread, pyqtSlot, QRectF +from PyQt6.QtGui import QPixmap, QPainter, QMovie, QIcon, QTransform, QImage, QAction, QPalette, QColor +from PyQt6.QtWidgets import QApplication, QMainWindow, QPushButton, QFileDialog, QSlider, QWidget, QGraphicsScene, \ + QGraphicsView, QGraphicsPixmapItem, QSystemTrayIcon, QMenu, QStyleFactory, QGraphicsItem, QCheckBox + +try: + from led_controller_openrgb import LEDController + led_controller_enabled = True +except ImportError: + led_controller_enabled = False + +if platform.system() == 'Windows': + import winshell +elif platform.system() == 'Linux': + import subprocess + +# Set up logging +logging.basicConfig(level=logging.DEBUG) + +VID = 0x1b1c # Corsair +PID = 0x0c39 # Corsair LCD Cap for Elite Capellix coolers + + +@dataclass +class CorsairCommand: + opcode: int # 0x02 + unknown1: int # 0x05 + unknown2: int # 0x40 + is_end: bool # 0x00 or 0x01 + part_num: int # 0x0000 - 0xffff, little endian + datalen: int # 0x0000 - 0xffff, little endian + data: bytes # datalen bytes + padding up to packet size + + HEADER_SIZE = 8 + + def to_bytes(self): + return bytes([ + self.opcode, + self.unknown1, + self.unknown2, + 0x01 if self.is_end else 0x00, + ]) + \ + self.part_num.to_bytes(2, byteorder='little') + \ + self.datalen.to_bytes(2, byteorder='little') + \ + self.data + + @property + def is_start(self): + return self.part_num == 0 + + @property + def header_size(self): + return self.HEADER_SIZE + + @property + def size(self): + return self.header_size + self.datalen + + +class UpdateDeviceThread(QThread): + captureSignal: pyqtSignal = pyqtSignal() + + def __init__(self, container, main_window): + super().__init__() + self.container = container + self.main = main_window + self.device = hid.device() + try: + self.device.open(VID, PID) + except Exception as e: + logging.error(f"Error opening device: {e}") + raise + QTimer.singleShot(0, self.start_timer) + + def start_timer(self): + self.update_lcd_timer = QTimer() + self.update_lcd_timer.timeout.connect(self.update_lcd) + self.update_lcd_timer.start(int(1000 / 30)) + + def run(self): + self.exec() + + @pyqtSlot() + def update_lcd(self): + try: + image = self.main.capture_container() + + width = image.width() + height = image.height() + ptr = image.bits() + ptr.setsize(height * width * 4) + arr = np.frombuffer(ptr, np.uint8).reshape((height, width, 4)) + + arr = cv2.resize(arr, (480, 480)) + + image_data = cv2.imencode('.jpg', arr)[1].tobytes() + + self.write_command(image_data) + except Exception as e: + logging.error(f"Error updating LCD: {e}") + + def make_commands(self, data, opcode=0x02, max_len=1024): + real_max_len = max_len - CorsairCommand.HEADER_SIZE + part_num = 0 + while data: + if len(data) < real_max_len: + padded_data = data + b'\x00' * (real_max_len - len(data)) + else: + padded_data = data[:real_max_len] + datalen = min(real_max_len, len(data)) + data = data[real_max_len:] + try: + yield CorsairCommand( + opcode=opcode, + unknown1=0x05, + unknown2=0x40, + is_end=not bool(data), + part_num=part_num, + datalen=datalen, + data=padded_data, + ) + except Exception as e: + logging.error(f"Error creating CorsairCommand: {e}") + raise + part_num += 1 + + def write_command(self, data): + try: + commands = self.make_commands(data) + for command in commands: + self.device.write(command.to_bytes()) + except Exception as e: + logging.error(f"Error writing command to device: {e}") + + +class MainWindow(QMainWindow): + def __init__(self): + super().__init__() + logging.debug("Initializing MainWindow") + self.pixmap_item = QGraphicsPixmapItem() + self.pixmap_item.setPos(QPointF(0, 0)) + + self.tray_icon = QSystemTrayIcon(self) + self.tray_icon.setIcon(QIcon('icon.ico')) + + self.window_state_handler = WindowStateHandler(self) + if led_controller_enabled: + self.led_controller = LEDController(self) + else: + logging.debug("LED controller is not enabled") + + self.current_image_path = None + self.last_pixmap_pos = None + self.last_transform = None + self.last_slider_value = None + self.last_scene_rect = None + self.last_scrollbar_pos = None + + self.setWindowTitle('Corsair LCD Tool') + self.setFixedSize(600, 650) + + self.container = QWidget(self) + self.container.setGeometry(60, 20, 480, 480) + self.container.setStyleSheet("background-color: #282c34; border: 0px") + + self.scene = QGraphicsScene(self.container) + self.view = NoScrollGraphicsView(self.scene, self.container) + self.view.setGeometry(0, 0, 480, 480) + self.view.setRenderHint(QPainter.RenderHint.Antialiasing) + self.view.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) + self.view.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) + self.view.setTransformationAnchor(QGraphicsView.ViewportAnchor.AnchorUnderMouse) + + self.open_button = QPushButton("Open Image", self) + self.open_button.setGeometry(60, 540, 120, 30) + self.open_button.clicked.connect(self.open_image) + + self.tray_button = QPushButton("Minimize to Tray", self) + self.tray_button.setGeometry(240, 540, 120, 30) + self.tray_button.clicked.connect(self.window_state_handler.minimize_window) + + self.reset_button = QPushButton("Reset View", self) + self.reset_button.setGeometry(420, 540, 120, 30) + self.reset_button.clicked.connect(self.reset_image) + + self.script_path = os.path.join(os.getcwd(), 'corsair_lcd_tool.py') + + if platform.system() == "Windows": + self.startup_folder = winshell.startup() + + self.shortcut_path = os.path.join(self.startup_folder, 'corsair_lcd_tool.lnk') + else: + pass + + self.python_path = sys.executable + + self.startup_checkbox = QCheckBox("Run at startup", self) + self.startup_checkbox.setGeometry(60, 580, 120, 30) + self.startup_checkbox.clicked.connect(self.update_startup) + + self.slider = QSlider(Qt.Orientation.Horizontal, self) + self.slider.setGeometry(60, 510, 480, 20) + self.slider.setMinimum(20) + self.slider.setMaximum(180) + self.slider.setValue(100) + self.slider.setDisabled(True) + self.slider.valueChanged.connect(self.scale_image) + + restore_action = QAction("Restore", self) + restore_action.triggered.connect(self.showNormal) + + quit_action = QAction("Quit", self) + quit_action.triggered.connect(QApplication.quit) + + self.tray_menu = QMenu() + self.tray_menu.addAction(restore_action) + self.tray_menu.addAction(quit_action) + + self.tray_icon.setContextMenu(self.tray_menu) + + self.setWindowIcon(QIcon('icon.ico')) + self.movie = None + + logging.debug("MainWindow initialized") + + try: + self.updateDeviceThread = UpdateDeviceThread(self.container, self) + self.updateDeviceThread.start() + self.frames = [] + except Exception as e: + logging.error(f"Error initializing MainWindow: {e}") + raise + + QApplication.instance().setStyle(QStyleFactory.create('Fusion')) + + palette = QPalette() + palette.setColor(QPalette.ColorRole.Window, QColor('#2c313a')) + palette.setColor(QPalette.ColorRole.WindowText, QColor('white')) + palette.setColor(QPalette.ColorRole.Base, QColor('#2c313a')) + palette.setColor(QPalette.ColorRole.AlternateBase, QColor('gray')) + palette.setColor(QPalette.ColorRole.ToolTipBase, QColor('#2c313a')) + palette.setColor(QPalette.ColorRole.ToolTipText, QColor('white')) + palette.setColor(QPalette.ColorRole.Text, QColor('white')) + palette.setColor(QPalette.ColorRole.Button, QColor('#2c313a')) + palette.setColor(QPalette.ColorRole.ButtonText, QColor('white')) + palette.setColor(QPalette.ColorRole.BrightText, QColor('red')) + palette.setColor(QPalette.ColorRole.Link, QColor('#0069c0')) + palette.setColor(QPalette.ColorRole.Highlight, QColor('#0069c0')) + palette.setColor(QPalette.ColorRole.HighlightedText, QColor('black')) + QApplication.instance().setPalette(palette) + + self.save_state_handler = SaveStateHandler(self) + self.save_state_handler.load_image_state() + + if not self.startup_checkbox.isChecked(): + self.show() + else: + QTimer.singleShot(5, self.window_state_handler.minimize_window) + logging.debug("UI Initialized") + + def update_startup(self): + state = self.startup_checkbox.isChecked() + logging.debug(f"Checkbox state: {state}") + logging.debug(f"Script path: {self.script_path}") + self.save_state_handler.save_image_state() + + if platform.system() == "Windows": + logging.debug(f"Shortcut path: {self.shortcut_path}") + if state: + with winshell.shortcut(self.shortcut_path) as shortcut: + shortcut.path = self.script_path + shortcut.working_directory = os.path.dirname(self.script_path) + shortcut.description = 'Corsair LCD Tool' + logging.debug("Shortcut created.") + else: + if os.path.exists(self.shortcut_path): + os.remove(self.shortcut_path) + logging.debug("Shortcut removed.") + elif platform.system() == "Linux": + service_content = f"""[Unit] + + Description=Corsair LCD Tool + + [Service] + ExecStart={self.python_path} {self.script_path} + + [Install] + WantedBy=default.target + """ + service_path = os.path.join(os.path.expanduser("~"), ".config/systemd/user", f"{os.path.basename(self.script_path)}.service") + if state: + with open(service_path, 'w') as f: + f.write(service_content) + subprocess.run(["systemctl", "--user", "daemon-reload"], check=True) + subprocess.run(["systemctl", "--user", "enable", os.path.basename(self.script_path)], check=True) + logging.debug("Systemd user service created and enabled.") + else: + if os.path.exists(service_path): + os.remove(service_path) + subprocess.run(["systemctl", "--user", "daemon-reload"], check=True) + logging.debug("Systemd user service removed.") + + def open_image(self): + logging.debug("Entering open_image function") + file_name, _ = QFileDialog.getOpenFileName(self, "Open Image", "", + "Images (*.png *.xpm *.jpg *.bmp *.gif)") + if file_name: + logging.debug(f"Opening image file: {file_name}") + self.load_new_image(file_name) + self.current_image_path = file_name + self.save_state_handler.restart_save_image_state_timer() + + def load_new_image(self, file_name): + logging.debug(f"Loading a new image: {file_name}") + + self.scene.clear() + + if file_name.lower().endswith('.gif'): + self.load_new_gif(file_name) + else: + if self.movie is not None: + self.movie.stop() + self.movie.deleteLater() + self.movie = None + + pixmap = QPixmap(file_name) + + self.pixmap_item = QGraphicsPixmapItem(pixmap) + + self.pixmap_item.setFlag(QGraphicsItem.GraphicsItemFlag.ItemIsMovable) + + self.scene.addItem(self.pixmap_item) + + self.slider.setEnabled(True) + + def load_new_gif(self, file_name): + if self.movie is not None: + self.movie.stop() + self.movie.deleteLater() + + self.movie = QMovie(file_name) + + self.pixmap_item = QGraphicsPixmapItem() + + self.pixmap_item.setFlag(QGraphicsItem.GraphicsItemFlag.ItemIsMovable) + + self.scene.addItem(self.pixmap_item) + + self.movie.frameChanged.connect(lambda: self.pixmap_item.setPixmap(QPixmap.fromImage(self.movie.currentImage()))) + + self.movie.start() + + self.slider.setEnabled(True) + + def reset_image(self): + if self.current_image_path is not None: + self.load_new_image(self.current_image_path) + + def scale_image(self, value): + if self.pixmap_item is not None: + self.pixmap_item.resetTransform() + scale_factor = value / 100.0 + + self.pixmap_item.setTransformOriginPoint(self.pixmap_item.boundingRect().center()) + + self.pixmap_item.setScale(scale_factor) + self.save_state_handler.restart_save_image_state_timer() + + def capture_container(self): + try: + pixmap = QPixmap(self.container.size()) + self.container.render(pixmap) + return pixmap.toImage() + except Exception as e: + logging.error(f"Error in capture_container: {e}") + return QImage() + + +class SaveStateHandler: + def __init__(self, main_window): + self.old_pos = None + self.main = main_window + self.is_first_load = True + self.check_state_timer = QTimer() + self.check_state_timer.setInterval(1000) + self.check_state_timer.timeout.connect(self.check_state) + self.save_image_state_timer = QTimer() + self.save_image_state_timer.setInterval(2000) + self.save_image_state_timer.timeout.connect(self.handle_save_image_state_timeout) + self.save_image_state_flag = False + self.old_transform = None + + def load_image_state(self): + logging.debug("Entering load_image_state function") + try: + logging.debug(f"self.is_first_load value: {self.is_first_load}") + if not self.is_first_load: + logging.debug("State has already been loaded. Skipping load.") + return + + state_file = 'state.yaml' + if os.path.exists(state_file): + try: + with open(state_file, 'r') as f: + state = yaml.safe_load(f) + logging.debug(f"Loaded state from file: {state}") + + if state is not None: + image_path = state.get('lastImagePath', '') + if image_path and os.path.exists(image_path): + self.main.load_new_image(image_path) + self.main.current_image_path = image_path + self.main.view.resetTransform() + self.main.scene.setSceneRect(QRectF(*state.get('last_scene_rect'))) + self.main.pixmap_item.setPos(QPointF(*state.get('last_pixmap_pos'))) + self.main.view.setTransform(QTransform().scale(*state.get('last_transform'))) + self.main.slider.setValue(state.get('last_slider_value')) + self.main.view.horizontalScrollBar().setValue(state.get('last_scrollbar_pos')[0]) + self.main.view.verticalScrollBar().setValue(state.get('last_scrollbar_pos')[1]) + self.main.startup_checkbox.setChecked(state.get('startup_checkbox_state', False)) + logging.debug(f"Loaded state: {state}") + else: + logging.warning( + "State file is empty or image path does not exist. Loading with default settings.") + else: + logging.warning("No state file found. Loading with default settings.") + except yaml.YAMLError as e: + logging.error( + f"Error loading state: Invalid YAML file. Loading with default settings. Error details: {e}") + except Exception as e: + logging.error(f"Error loading state: {e}") + + self.is_first_load = False + self.check_state_timer.start() + logging.debug('check_state_timer started') + + def save_image_state(self): + try: + if self.is_first_load: + logging.debug("State loading is not complete. Skipping save_image_state.") + return + else: + state = { + 'lastImagePath': self.main.current_image_path, + 'last_pixmap_pos': [self.main.pixmap_item.pos().x(), self.main.pixmap_item.pos().y()], + 'last_transform': [self.main.view.transform().m11(), self.main.view.transform().m22()], + 'last_slider_value': self.main.slider.value(), + 'last_scene_rect': [self.main.scene.sceneRect().x(), self.main.scene.sceneRect().y(), + self.main.scene.sceneRect().width(), self.main.scene.sceneRect().height()], + 'last_scrollbar_pos': [self.main.view.horizontalScrollBar().value(), + self.main.view.verticalScrollBar().value()], + 'startup_checkbox_state': self.main.startup_checkbox.isChecked() + } + with open('state.yaml', 'w') as file: + yaml.dump(state, file) + logging.debug(f"Saved state to disk: {state}") + except Exception as e: + logging.error(f"An error occurred while saving the image state: {e}") + + def check_state(self): + try: + newPos = self.main.pixmap_item.pos() + newTransform = self.main.view.transform() + if newPos is not None and newTransform is not None: + if newPos != self.old_pos or newTransform != self.old_transform: + if not self.is_first_load: + self.restart_save_image_state_timer() + self.old_pos = newPos + self.old_transform = newTransform + else: + logging.warning("Either pixmap position or view transform is None.") + except Exception as e: + logging.error(f"Error in check_state: {e}") + + def handle_save_image_state_timeout(self): + try: + if self.save_image_state_flag: + self.save_image_state() + self.save_image_state_flag = False + except Exception as e: + logging.error(f"Error handling save state timeout: {e}") + + def restart_save_image_state_timer(self): + try: + if self.is_first_load: + logging.debug("restart_save_image_state_timer skipped because is_first_load is True.") + return + if self.save_image_state_timer.isActive(): + self.save_image_state_timer.stop() + self.save_image_state_timer.start() + self.save_image_state_flag = True + logging.debug(f"restart_save_image_state_timer called") + except Exception as e: + logging.error(f"Error in restart_save_image_state_timer: {e}") + + +class WindowStateHandler: + def __init__(self, main_window): + self.main = main_window + self.main.tray_icon.activated.connect(self.handle_tray_activation) + + def handle_tray_activation(self, reason): + if reason == QSystemTrayIcon.ActivationReason.Trigger: + logging.debug("System tray icon activated. Restoring window.") + self.restore_window() + + def restore_window(self): + try: + self.main.show() + except Exception as e: + logging.error(f"Error in restore_window: {e}") + + def minimize_window(self): + try: + logging.debug("Minimizing window to system tray.") + self.main.hide() + self.main.tray_icon.show() + except Exception as e: + logging.error(f"Error in minimize_window: {e}") + + +class NoScrollGraphicsView(QGraphicsView): + def __init__(self, scene, parent=None): + super().__init__(scene, parent) + self.setViewportUpdateMode(QGraphicsView.ViewportUpdateMode.FullViewportUpdate) + + def scroll_contents_by(self, dx, dy): + pass + + def wheelEvent(self, event): + pass + + +if __name__ == "__main__": + try: + app = QApplication(sys.argv) + window = MainWindow() + window.show() + sys.exit(app.exec()) + except Exception as e: + logging.error(f"Error in main: {e}") diff --git a/icon.ico b/icon.ico new file mode 100644 index 0000000..c0ef798 Binary files /dev/null and b/icon.ico differ diff --git a/led_controller_openrgb.py b/led_controller_openrgb.py new file mode 100644 index 0000000..34f3e84 --- /dev/null +++ b/led_controller_openrgb.py @@ -0,0 +1,116 @@ +import atexit +import math +import time +import logging +from PyQt6.QtCore import QThread, pyqtSignal, QTimer, QObject +from PyQt6.QtGui import QColor +from PyQt6.QtWidgets import QMessageBox +from openrgb import OpenRGBClient +from openrgb.utils import RGBColor + + +class LEDController(QThread): + captureSignal = pyqtSignal() + + def __init__(self, main_window): + super().__init__() + self.main = main_window + self.enabled = True + self.client = self.connect_to_openrgb() + if not self.client: + QMessageBox.warning(None, "OpenRGB Connection Error", + "OpenRGB is required for LED control. " + "Please ensure OpenRGB is running and SDK Server is enabled.") + self.enabled = False + return + self.last_rgb_colors = [RGBColor(0, 0, 0)] * 24 + self.worker = LEDWorker(self.client, self.last_rgb_colors, self) + + self.started.connect(self.worker.set_colors) + + QTimer.singleShot(0, self.start_timer) + + atexit.register(self.stop_led) + + def connect_to_openrgb(self): + for attempt in range(2): + try: + return OpenRGBClient() + except Exception as e: + if attempt == 0: + logging.warning("Retrying after 5 seconds...") + time.sleep(5) + return None + + def start_timer(self): + self.update_led_timer = QTimer() + self.update_led_timer.timeout.connect(self.analyze_and_set_colors) + self.update_led_timer.start(100) + + def analyze_and_set_colors(self, width=479, height=479, smoothing_factor=0.25, saturation_factor=1.25): + if not self.enabled: + return + + try: + image = self.main.capture_container() + radius = min(width, height) // 2 + + for i in range(24): + angle = 2 * math.pi * (23 - i) / 24 + x = width - int(width / 2 + radius * math.cos(angle)) + y = int(height / 2 + radius * math.sin(angle)) + + color = QColor(image.pixel(x, y)) + color = color.toHsv() + + + color.setHsv(color.hue(), min(255, int(color.saturation() * saturation_factor)), color.value()) + + rgb_color = RGBColor(color.red(), color.green(), color.blue()) + + + self.last_rgb_colors[i] = RGBColor( + int(self.last_rgb_colors[i].red * ( + 1 - smoothing_factor) + rgb_color.red * smoothing_factor), + int(self.last_rgb_colors[i].green * ( + 1 - smoothing_factor) + rgb_color.green * smoothing_factor), + int(self.last_rgb_colors[i].blue * ( + 1 - smoothing_factor) + rgb_color.blue * smoothing_factor) + ) + + self.started.emit() + except Exception as e: + logging.error(f"Error updating LED: {e}") + + def stop_led(self): + self.enabled = False + self.update_led_timer.stop() + self.worker.turn_off_leds() + + +class LEDWorker(QObject): + colorReady = pyqtSignal(object) + + def __init__(self, client, last_rgb_colors, controller): + super().__init__() + self.client = client + self.last_rgb_colors = last_rgb_colors + self.controller = controller + + def set_colors(self): + if not self.controller.enabled: + return + try: + for device in self.client.ee_devices: + if device.name == "Corsair Commander Core": + pump_zone = next(zone for zone in device.zones if zone.name == "Pump") + for i, rgb_color in enumerate(self.last_rgb_colors): + pump_zone.leds[i].set_color(rgb_color, fast=True) + except Exception as e: + logging.error(f"Error setting colors: {e}") + + def turn_off_leds(self): + time.sleep(0.1) + for device in self.client.ee_devices: + if device.name == "Corsair Commander Core": + device.clear() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..61080e4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +hidapi +numpy +opencv-python +openrgb-python +PyQt6 +pyyaml