import simplematrixbotlib as botlib import requests import datetime import asyncio import pytz from typing import Dict import os from dotenv import load_dotenv load_dotenv() creds = botlib.Creds( os.getenv("BOT_HOMESERVER"), os.getenv("BOT_USERNAME"), os.getenv("BOT_PASSWORD") ) bot = botlib.Bot(creds) PREFIX = '!' utc = pytz.UTC user_locations: dict = {} user_reminders: dict = {} user_room_ids: dict = {} @bot.listener.on_message_event async def echo(room, message) -> None: match = botlib.MessageMatch(room, message, bot, PREFIX) if match.is_not_from_this_bot() and match.prefix() and match.command("echo"): await bot.api.send_text_message( room.room_id, " ".join(arg for arg in match.args()) ) @bot.listener.on_message_event async def times(room, message) -> None: match = botlib.MessageMatch(room, message, bot, PREFIX) username = str(message).split(': ')[0] response = "" if match.is_not_from_this_bot() and match.prefix() and match.command("times"): if username not in user_locations: response = "Please set your location first using *!set-location City, Country*" else: times = get_praying_times(datetime.datetime.today(), username) response = "Today's praying times for {}:\n\n".format(user_locations[username]) response += "\n".join("{}: {}".format(key, value) for key, value in times.items()) await bot.api.send_markdown_message(room.room_id, response) @bot.listener.on_message_event async def usage(room, message) -> None: match = botlib.MessageMatch(room, message, bot, PREFIX) response = """usage: - **!set-location** - **!times** - **!set-reminder** where X is the number of minutes you want to receive the reminder before praying """ if match.is_not_from_this_bot() and not match.prefix(): await bot.api.send_markdown_message(room.room_id, response) @bot.listener.on_message_event async def set_location(room, message) -> None: match = botlib.MessageMatch(room, message, bot, PREFIX) response = "" if match.is_not_from_this_bot() and match.prefix() and match.command("set-location"): location = " ".join(arg for arg in match.args()) username = str(message).split(': ')[0] user_locations[username] = location response = "Your location was set to: {}".format(location) await bot.api.send_markdown_message(room.room_id, response) @bot.listener.on_message_event async def set_reminder(room, message) -> None: match = botlib.MessageMatch(room, message, bot, PREFIX) response = "" if match.is_not_from_this_bot() and match.prefix() and match.command("set-reminder"): minutes = int(match.args()[0]) username = str(message).split(': ')[0] if username not in user_locations: response = "You did not set your location yet." await bot.api.send_markdown_message(room.room_id, response) else: user_reminders[username] = minutes user_room_ids[username] = room.room_id response = "Your reminder was set to {} minutes before praying.".format(minutes) schedule_reminder(username) await bot.api.send_markdown_message(room.room_id, response) def get_praying_times(date: datetime.date, username) -> Dict[str, str]: day = date.day month = date.month year = date.year times_api_url = 'http://api.aladhan.com/v1/timingsByAddress/{}-{}-{}?address={}&method=7&iso8601=true'.format(day, month, year, user_locations[username]) times = requests.get(times_api_url) times = times.json()['data']['timings'] return times def schedule_reminder(username) -> None: # TODO: add peristence for reminders times = get_praying_times(datetime.date.today(), username) now = datetime.datetime.now(datetime.UTC) for prayer, time in times.items(): praying_time = datetime.datetime.fromisoformat(time) if praying_time > now: seconds = int((praying_time - now).total_seconds()) message = "{} is at {}".format(prayer, praying_time.strftime("%H:%M")) asyncio.ensure_future(remind(username, message, seconds - user_reminders[username] * 60)) async def remind(username, message, seconds) -> None: await asyncio.sleep(seconds) await bot.api.send_markdown_message(user_room_ids[username], message) bot.run()