from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS import dotenv import pytesseract import subprocess import tempfile import pyautogui import json import cv2 import os ROIS = { "input_voltage": ((38, 182, 123, 28), True), "input_frequency": ((40, 238, 125, 31), True), "output_voltage": ((510, 180, 125, 29), True), "load_level": ((512, 240, 125, 28), True), "battery_voltage": ((146, 308, 91, 31), True), "temperature": ((356, 312, 92, 27), True), "battery_capacity": ((566, 312, 90, 26), True), "ups_status": ((304, 429, 87, 23), False), "utility_status": ((523, 427, 121, 32), False) } def get_window_geometries(): return [ s.split()[:6] + [" ".join(s.split()[7:])] for s in subprocess.run(["wmctrl", "-l", "-G"], stdout = subprocess.PIPE).stdout.decode().split("\n")[:-1] ] def find_upsmart_geometry(): for g in get_window_geometries(): if g[-1] == "UPSmart": return g raise Exception("Couldn't find UPSMart application") def focus_upsmart(): # it would be much nicer to use an API such as wmctrl to focus the application... # but it doesn't seem to work with UPSmart specifically, perhaps because you need # to start it as root geom = find_upsmart_geometry() pyautogui.moveTo(int(geom[2]), int(geom[3]), duration = 0.5) pyautogui.click() def parse_decimal(s): s = s.replace(",", ".") return float("".join([d for d in s if d.isdigit() or d == "."])) def read_image(im_path): im = cv2.imread(im_path) fields = {} with tempfile.TemporaryDirectory() as td: for field, r in ROIS.items(): r, decimal = r impath = os.path.join(td, "%s.png" % field) im_crop = im[r[1]:r[1] + r[3], r[0]: r[0] + r[2]] cv2.imwrite(impath, im_crop) ocr = pytesseract.image_to_string(im_crop).strip() if decimal: ocr = parse_decimal(ocr) fields[field] = ocr fields["on_battery"] = int(fields["utility_status"] != 'Utility Normal') return fields def influx_write(fields): influxc = InfluxDBClient( url = "http://%s:8086" % INFLUXDB_HOST, token = os.environ["DOCKER_INFLUXDB_INIT_ADMIN_TOKEN"], org = os.environ["DOCKER_INFLUXDB_INIT_ORG"] ) influxc.ping() write_api = influxc.write_api(write_options = SYNCHRONOUS) write_api.write( os.environ["DOCKER_INFLUXDB_INIT_BUCKET"], os.environ["DOCKER_INFLUXDB_INIT_ORG"], [{ "measurement": "ups_status", "fields": fields }], write_precision = WritePrecision.S ) def main(): focus_upsmart() with tempfile.TemporaryDirectory() as td: fp = os.path.join(td, "upsmart.png") subprocess.run(["gnome-screenshot", "--file=%s" % fp, "-w"]) fields = read_image(fp) print(json.dumps(fields, indent = 4)) influx_write(fields) if __name__ == "__main__": env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.env") if os.path.exists(env_path): import dotenv dotenv.load_dotenv(dotenv_path = env_path) INFLUXDB_HOST = "192.168.69.5" main()