77 lines
2.2 KiB
Python
77 lines
2.2 KiB
Python
#############################################
|
|
#
|
|
# 1. Check Permissions.
|
|
# 2. IP address validation.
|
|
# 3. Notify detect change.add()
|
|
# 4. Add disable handler
|
|
#############################################
|
|
|
|
from os import getenv
|
|
import re
|
|
from telethon import TelegramClient, events, custom
|
|
from telethon.tl.custom.button import Button
|
|
from telethon.tl.custom.message import Message
|
|
from datetime import datetime, timedelta, time
|
|
import asyncio
|
|
from pinger import Pinger
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
IP_ADDRESS_REGEX = '^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)(\.(?!$)|$)){4}$'
|
|
|
|
|
|
def main():
|
|
load_dotenv()
|
|
pinger = Pinger()
|
|
|
|
client = TelegramClient("pinger", getenv("API_ID"), getenv("API_HASH"))
|
|
|
|
|
|
async def send_telegram_message(result):
|
|
# print(result)
|
|
await client.send_message(383724428, str(result))
|
|
|
|
async def check_pings():
|
|
await pinger.start(send_telegram_message)
|
|
|
|
@client.on(events.NewMessage(pattern="/start"))
|
|
async def start(event:Message):
|
|
if event.is_private:
|
|
await event.respond("שלום")
|
|
|
|
# @client.on(events.NewMessage(pattern="/help"))
|
|
# async def help(event:Message):
|
|
# asyncio.create_task(help_message(event))
|
|
|
|
@client.on(events.NewMessage(pattern="/enable"))
|
|
async def enable(event:Message):
|
|
if event.is_private:
|
|
asyncio.create_task(check_pings())
|
|
await event.reply("Enable")
|
|
|
|
@client.on(events.NewMessage(pattern="/disable"))
|
|
async def disable(event:Message):
|
|
if event.is_private:
|
|
await pinger.stop()
|
|
await event.reply("Stopping...")
|
|
|
|
@client.on(events.NewMessage(pattern="/add_ip"))
|
|
async def add_ip(event:Message):
|
|
if event.is_private:
|
|
ip = event.raw_text.split(" ")[1]
|
|
if re.search(ip):
|
|
await pinger.add_new_ip(event.raw_text.split(" ")[1])
|
|
|
|
@client.on(events.NewMessage(pattern="/del_ip"))
|
|
async def del_ip(event:Message):
|
|
if event.is_private:
|
|
await pinger.del_ip(event.raw_text.split(" ")[1])
|
|
|
|
client.start(bot_token=f'{getenv("BOT_TOKEN")}')
|
|
print("Client was started!")
|
|
client.loop.create_task(check_pings())
|
|
client.run_until_disconnected()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |