aboutsummaryrefslogtreecommitdiffstats
path: root/upsmartmonitor.py
blob: db8d622968513c70fb211b00a03b3694d84ad6a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import dotenv
import datetime
import pytesseract
import subprocess
import tempfile
import pyautogui
import json
import cv2
import os

ROIS = {
    "input_voltage": ((38, 182, 123, 28), True),
    "input_frequency": ((40, 238, 125, 31), True),
    "output_voltage": ((510, 180, 125, 29), True),
    "load_level": ((512, 240, 125, 28), True),
    "battery_voltage": ((146, 308, 91, 31), True),
    "temperature": ((356, 312, 92, 27), True),
    "battery_capacity": ((566, 312, 90, 26), True),
    "ups_status": ((304, 429, 87, 23), False),
    "utility_status": ((523, 427, 121, 32), False),
    "discharge_time": ((217, 385, 58, 16), False)
}

def get_window_geometries():
    return [
        s.split()[:6] + [" ".join(s.split()[7:])] 
        for s in subprocess.run(["wmctrl", "-l", "-G"], stdout = subprocess.PIPE).stdout.decode().split("\n")[:-1]
    ]

def find_upsmart_geometry():
    for g in get_window_geometries():
        if g[-1] == "UPSmart":
            return g
    raise Exception("Couldn't find UPSMart application")

def focus_upsmart():
    # it would be much nicer to use an API such as wmctrl to focus the application...
    # but it doesn't seem to work with UPSmart specifically, perhaps because you need
    # to start it as root
    geom = find_upsmart_geometry()

    pyautogui.moveTo(int(geom[2]), int(geom[3]), duration = 0.5)
    pyautogui.click()

def parse_decimal(s):
    s = s.replace(",", ".")
    return float("".join([d for d in s if d.isdigit() or d == "."]))

def read_image(im_path):
    im = cv2.imread(im_path)
    fields = {}
    with tempfile.TemporaryDirectory() as td:
        for field, r in ROIS.items():
            r, decimal = r
            impath = os.path.join(td, "%s.png" % field)
            im_crop = im[r[1]:r[1] + r[3], r[0]: r[0] + r[2]]
            cv2.imwrite(impath, im_crop)

            ocr = pytesseract.image_to_string(im_crop).strip()
            if decimal:
                ocr = parse_decimal(ocr)
            fields[field] = ocr

    fields["on_battery"] = int(fields["utility_status"] != 'Utility Normal')
    fields["discharge_time"] = datetime.timedelta(**dict(zip(["hours", "minutes", "seconds"], [int(i) for i in fields["discharge_time"].split(":")]))).seconds
    return fields

def influx_write(fields):
    influxc = InfluxDBClient(
        url = "http://%s:8086" % INFLUXDB_HOST,
        token = os.environ["DOCKER_INFLUXDB_INIT_ADMIN_TOKEN"],
        org = os.environ["DOCKER_INFLUXDB_INIT_ORG"]
    )
    influxc.ping()

    write_api = influxc.write_api(write_options = SYNCHRONOUS)
    write_api.write(
        os.environ["DOCKER_INFLUXDB_INIT_BUCKET"],
        os.environ["DOCKER_INFLUXDB_INIT_ORG"],
        [{
            "measurement": "upsmartmonitor",
            "fields": fields
        }],
        write_precision = WritePrecision.S
    )

def main():
    # focus_upsmart()

    with tempfile.TemporaryDirectory() as td:
        # fp = os.path.join(td, "upsmart.png")
        # subprocess.run(["gnome-screenshot", "--file=%s" % fp, "-w"])

        fields = read_image("upsmart.jpg")
        # fields = read_image(fp)
        print(json.dumps(fields, indent = 4))
        influx_write(fields)


if __name__ == "__main__":
    env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.env")
    if os.path.exists(env_path):
        import dotenv
        dotenv.load_dotenv(dotenv_path = env_path)
        INFLUXDB_HOST = "192.168.69.5"

    main()