Files
ping-telegram-bot/pinger.py
2023-01-10 15:40:35 +02:00

78 lines
2.1 KiB
Python

import asyncio
class Pinger():
def __init__(self):
self.enabled = False
self.ips_list = []
self.ips_running_list = []
async def add_new_ip(self, ip):
self.ips_list.append(ip)
async def del_ip(self, ip):
self.ips_list.remove(ip)
async def start(self, fn):
self.enabled = True
asyncio.create_task(self.ping_all(fn))
async def stop(self):
self.enabled = False
while len(self.ips_running_list) > 0:
await asyncio.sleep(0)
async def notify_ping(self, ip, fn):
success = False
cmd = f"ping -c 1 -W 5 -q {ip}"
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
# print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
output = f'[stdout]\n{stdout.decode()}'
success = bool(int((output.split(",")[1].strip()[0])))
# if stderr:
# print(f'[stderr]\n{stderr.decode()}')
await fn(str({"ip" : ip, "success": success}))
return ip
async def ping_all(self, fn):
def on_done(task_ip: asyncio.Task):
ip = task_ip.result()
self.ips_running_list.remove(ip)
while self.enabled:
for ip in self.ips_list:
if ip not in self.ips_running_list:
task = asyncio.create_task(self.notify_ping(ip, fn))
self.ips_running_list.append(ip)
task.add_done_callback(on_done)
else:
await asyncio.sleep(0)
await asyncio.sleep(0)
async def output_print(result):
print(result)
async def main():
p = Pinger()
await p.add_new_ip("192.168.14.121")
await p.add_new_ip("8.8.8.8")
await p.start(output_print)
await asyncio.sleep(20)
await p.stop()
print("Finished!")
if __name__ == "__main__":
asyncio.run(main())