You are not logged in.

#1 2025-01-30 13:48:21

zse
Member
Registered: 2024-05-28
Posts: 27

Journal warnings and errors notification python script

This is an alternative to journalctl-desktop-notification.

I like it better, as it is intended to be run as a regular user process, not a system daemon (no need to install anything, I simply start the script with XDG Autostart when I log in) and since it is a single file it can be more easily hacked to suit one's need or fancy.

Required packages are python-dbus and python-systemd. Modify the IGNORE_MAP variable to filter out expected messages (I've left it the way I use it).

#!/usr/bin/env python3

from systemd import journal

import dbus
import os
import re
import time


IGNORE_MAP = {
    "kernel": [
            "^ACPI: .*: Overriding _PRW sleep state",
            "^hub 6-0:1\\.0: config failed, hub doesn't have any ports! \\(err -19\\)$",
            "^i8042: PNP: PS/2 appears to have AUX port disabled",
            "^Bluetooth: hci0: HCI Enhanced Setup Synchronous Connection command is advertised, but not supported\\.$",
            "^block nvme0n1: No UUID available providing old NGUID$",
            "^warning: `plasmashell' uses wireless extensions which will stop working for Wi-Fi 7 hardware"
    ],
    "(udev-worker)": ["^controlC2: Process '/usr/bin/alsactl restore 2' failed with exit code 99"],
    "bolt.service": ["udev: failed to determine if uid is stable: unknown NHI PCI id '0x162e'"],
    "dbus-broker-launch": ["^Service file .* is not named after the D-Bus name"],
    "sddm-greeter-qt6": ["^file:", "^qrc:"],
    "org_kde_powerdevil": ["Testing for supported feature 0x10 returned Error_Info"],
    "user@1000.service": ["^s-monitors: Failed to create 'api.alsa.acp.device' device$", "^default: Failed to get percentage from UPower:"]
}

MIN_WAIT_BETWEEN_NOTIFY = 10


APP_NAME = os.path.basename(__file__)

PRIO_INDEX_CRITICAL = 0
PRIO_INDEX_ERROR = 1
PRIO_INDEX_WARNING = 2

SRC_KERNEL = "kernel"
SRC_SERVICE = "service"


def notify(summary, body, is_error = False):
    BUS_NAME = 'org.freedesktop.Notifications'
    OBJ_PATH = '/org/freedesktop/Notifications'
    IFACE_NAME = BUS_NAME
    bus = dbus.SessionBus()
    notify_obj = bus.get_object(BUS_NAME, OBJ_PATH)
    notifications = dbus.Interface(notify_obj, IFACE_NAME)

    app_icon = "dialog-error" if is_error else "dialog-warning"
    hints = { "urgency": 1, "category": "device" }

    notifications.Notify(APP_NAME, 0, app_icon, summary, body , [], hints, -1)
    time.sleep(MIN_WAIT_BETWEEN_NOTIFY)


def ignore_message(ignore_map, entry_src, entry_msg):
    if entry_src in ignore_map:
        for f in ignore_map[entry_src]:
            if f.search(entry_msg):
                return True
    return False


def prio_index_from_entry(entry):
    p = entry["PRIORITY"]
    if p == journal.LOG_WARNING:
        return PRIO_INDEX_WARNING
    if p == journal.LOG_ERR:
        return PRIO_INDEX_ERROR
    return PRIO_INDEX_CRITICAL

def single_entry_prio(counter):
    for src in counter:
        if counter[src][PRIO_INDEX_CRITICAL] == 1:
            return "critical"
        if counter[src][PRIO_INDEX_ERROR] == 1:
            return "error"
        if counter[src][PRIO_INDEX_WARNING] == 1:
            return "warning"

def has_error_or_higher(counter):
    for src in counter:
        if counter[src][PRIO_INDEX_CRITICAL] > 0 or counter[src][PRIO_INDEX_ERROR] > 0:
            return True
    return False

def worst_message_from_counter(counter):
    if counter[SRC_KERNEL][PRIO_INDEX_CRITICAL] > 0:
        return "Critical kernel error"
    if counter[SRC_KERNEL][PRIO_INDEX_ERROR] > 0:
        return "Kernel error"
    if counter[SRC_SERVICE][PRIO_INDEX_CRITICAL] > 0:
        return "Critical service error"
    if counter[SRC_SERVICE][PRIO_INDEX_ERROR] > 0:
        return "Service error"
    if counter[SRC_KERNEL][PRIO_INDEX_WARNING] > 0:
        return "Kernel warning"
    if counter[SRC_SERVICE][PRIO_INDEX_WARNING] > 0:
        return "Service warning"


def monitor_journal():
    TRACE_START_PATTERN = re.compile('^------------\\[ cut here \\]------------$')
    TRACE_END_PATTERN = re.compile('^---\\[ end trace [0-9a-f]{16} \\]---$')
    ignore_map = {}
    for e in IGNORE_MAP:
        ignore_map[e] = list(map(lambda a : re.compile(a), IGNORE_MAP[e]))

    log = journal.Reader()
    log.this_boot()
    log.log_level(journal.LOG_WARNING)

    skipping_trace = False

    time.sleep(MIN_WAIT_BETWEEN_NOTIFY)

    while True:
        log.wait()

        first_unfiltered_src = None
        first_unfiltered_msg = None
        trace_count = 0
        counter = { SRC_KERNEL: [0, 0, 0], SRC_SERVICE: [0, 0, 0] }

        for entry in log:
            entry_src = entry.get("SYSLOG_IDENTIFIER", entry.get("_SYSTEMD_UNIT", "?"))
            entry_msg = entry["MESSAGE"]
            prio_index = prio_index_from_entry(entry)

            if entry_src == SRC_KERNEL:
                if skipping_trace:
                    skipping_trace = (TRACE_END_PATTERN.search(entry_msg) == None)
                    continue
                elif TRACE_START_PATTERN.search(entry_msg):
                    skipping_trace = True
                    trace_count += 1
                elif ignore_message(ignore_map, entry_src, entry_msg):
                    continue
                counter[SRC_KERNEL][prio_index] += 1
            elif ignore_message(ignore_map, entry_src, entry_msg):
                continue
            else:
                counter[SRC_SERVICE][prio_index] += 1

            if first_unfiltered_src == None:
                first_unfiltered_src = entry_src
                first_unfiltered_msg = entry_msg

        if first_unfiltered_src == None:
            continue

        summary = "-"
        body = first_unfiltered_msg
        is_error = has_error_or_higher(counter)
        k = counter[SRC_KERNEL]
        s = counter[SRC_SERVICE]
        sum_kernel = sum(k)
        sum_service = sum(s)
        if sum_kernel + sum_service == 1:
            summary = "One {} message from {}".format(single_entry_prio(counter), first_unfiltered_src)
        else:
            body = ""
            if sum_kernel > 0:
                body += "Kernel: {} critical, {} error, {} warning, {} trace\n".format(
                    k[PRIO_INDEX_CRITICAL], k[PRIO_INDEX_ERROR], k[PRIO_INDEX_WARNING], trace_count)
            if sum_service > 0:
                body += "Service: {} critical, {} error, {} warning\n".format(
                    s[PRIO_INDEX_CRITICAL], s[PRIO_INDEX_ERROR], s[PRIO_INDEX_WARNING])
            body += "First from {}:\n{}".format(first_unfiltered_src, first_unfiltered_msg)
            summary = "{} (and more)".format(worst_message_from_counter(counter))

        notify(summary, body, is_error)



if __name__ == "__main__":
    try:
        monitor_journal()
    except Exception as err:
        notify("Internal error", "Exception: {}".format(err), True)
        raise

Offline

Board footer

Powered by FluxBB