From 20e4bb168c3103827aa373d359559a4cf3b4e018 Mon Sep 17 00:00:00 2001
From: jwansek <eddie.atten.ea29@gmail.com>
Date: Sun, 19 Nov 2023 14:44:45 +0000
Subject: Added script

---
 mikrotik.py | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 99 insertions(+)
 create mode 100644 mikrotik.py

(limited to 'mikrotik.py')

diff --git a/mikrotik.py b/mikrotik.py
new file mode 100644
index 0000000..42a6eeb
--- /dev/null
+++ b/mikrotik.py
@@ -0,0 +1,99 @@
+from dataclasses import dataclass
+import threading
+import fabric
+import os
+import re
+
+@dataclass
+class MikroTikSSHDevice:
+
+    def __post_init__(self):
+        self.interfaces = {}
+        for i in os.environ["MIKROTIK_INTERFACES"].split(";"):
+            self.interfaces.__setitem__(*i.split(","))
+        self.is_being_polled = threading.Event()
+        self.interface_groups_cache = {}
+
+        self.interface_groups = []
+        temp = []
+        for i, interface_name in enumerate(self.interfaces.keys(), 1):
+            temp.append(interface_name)
+            if i % 4 == 0:
+                self.interface_groups.append(tuple(temp))
+                temp = []
+        
+        # make sure we have some cache
+        # also use as sanity-test
+        for interface_group in self.interface_groups:
+            self._poll_interface_group(interface_group)
+
+    def _get_conn(self):
+        return fabric.Connection(
+            user = os.environ["MIKROTIK_USER"],
+            host = os.environ["MIKROTIK_DEVICE"],
+            connect_kwargs = {"key_filename": os.environ["MIKROTIK_KEY_PATH"]}
+        )
+
+    def _get_interfacegroup_containing(self, interface_name):
+        for interface_group in self.interface_groups:
+            if interface_name in interface_group:
+                return interface_group
+
+    def _poll_interface_group(self, interface_group):
+        self.is_being_polled.set()
+        result = self._get_conn().run("/interface/ethernet/poe/monitor %s once" % ",".join(interface_group), hide = True)
+        self.is_being_polled.clear()
+        parsed_result = self._parse_result(result)
+        self.interface_groups_cache[interface_group] = parsed_result
+        # print("Cached group:", interface_group)
+        return parsed_result
+
+    def _parse_result(self, result):
+        r = result.stdout
+        # print(r)
+        s = [re.split(r" +", row.rstrip())[1:] for row in r.split("\r\n")][:-2]
+        out = {i: {} for i in s[0][1:]}
+        off_interfaces = set()
+        for row in s[1:]:
+            column_decrimator = 0
+            output_name = row[0][:-1]
+            # print(output_name)
+
+            for i, interface_name in enumerate(out.keys(), 0):
+                # print("off_interfaces:", off_interfaces)
+                # print(i, interface_name, row[1:][i])
+                if interface_name in off_interfaces:
+                    # print("Skipping '%s' for %s..." % (output_name, interface_name))
+                    column_decrimator += 1
+                else:
+                    out[interface_name][output_name] = row[1:][i - column_decrimator]
+
+                if output_name == "poe-out-status":
+                    if row[1:][i] != "powered-on":
+                        # print("Adding %s to off interfaces" % interface_name)
+                        off_interfaces.add(interface_name)
+        return out
+
+    # i refuse to use async programming
+    def get_interface_poe(self, interface_name):
+        interface_group = self._get_interfacegroup_containing(interface_name)
+        if self.is_being_polled.is_set():
+            result = self.interface_groups_cache[interface_group][interface_name]
+            result["cached"] = True
+        else:
+            result = self._poll_interface_group(interface_group)[interface_name]
+            result["cached"] = False
+        
+        return result
+
+if __name__ == "__main__":
+    if not os.path.exists(os.path.join("/app", ".docker")):
+        import dotenv
+        dotenv.load_dotenv(dotenv_path = "power.env")
+
+    import time
+    mikrotik = MikroTikSSHDevice()
+    print("Ready.")
+    for interface_name in mikrotik.interfaces.keys():
+        threading.Thread(target = lambda i: print(i, mikrotik.get_interface_poe(i)), args = (interface_name, )).start()
+        time.sleep(1)
\ No newline at end of file
-- 
cgit v1.2.3