import os import sqlite3 from urllib import parse import requests from dotenv import load_dotenv from util import setupLogger, getDbConn load_dotenv() logger = setupLogger() class Bot: def __init__(self): self.token = os.getenv('BOT_TOKEN') self.chatId = os.getenv('BOT_CHAT_ID') self.isBusy = False self.hasNewMessages = False self.messages = list() def sendMessage(self, message, log=True): message = parse.quote_plus(message) url = f'https://api.telegram.org/bot{self.token}/sendMessage?chat_id={self.chatId}&parse_mode=html&text={message}' response = requests.get(url) if log: if response.status_code == 200: logger.info(f'Message sent: {response.text}') else: logger.error(f'Error sending message: {response.text}') def _dbUpdate(self, data): conn = getDbConn() cur = conn.cursor() for data in data['result']: updateId = data['update_id'] msg = data['message'] msgId = msg['message_id'] msgDate = msg['date'] msgFrom = msg['from']['username'] text = msg['text'] res = cur.execute(f"SELECT id, is_read FROM incoming WHERE id = {msgId}") if not res.fetchall(): cur.execute( f"INSERT INTO incoming VALUES ({msgId}, {msgDate}, '{msgFrom}', '{text}', False, {updateId})") self.hasNewMessages = True conn.commit() conn.close() def _checkUnreadDb(self): try: if not self.hasNewMessages: conn = getDbConn() cur = conn.cursor() res = cur.execute(f"SELECT id, is_read FROM incoming WHERE is_read = 0") if len(res.fetchall()) > 0: self.hasNewMessages = True conn.close() except sqlite3.Error as e: logger.error(e) def pollUpdate(self): self.isBusy = True try: url = f'https://api.telegram.org/bot{self.token}/getUpdates' response = requests.get(url) if response.status_code == 200: self._dbUpdate(response.json()) # Clean queue after reading because apparently it has maximum size and can block newer messages self.cleanTelegramApiQueue() except requests.exceptions.ConnectionError as e: logger.error("Connection error during polling update") self._checkUnreadDb() self.isBusy = False def getNewMessages(self): result = list() try: if not self.hasNewMessages: return result conn = getDbConn() cur = conn.cursor() resCheck = cur.execute(f"SELECT text FROM incoming WHERE is_read = False") for row in resCheck.fetchall(): result.append(row[0]) cur.execute(f"UPDATE incoming SET is_read = True WHERE is_read = False") conn.commit() conn.close() self.hasNewMessages = False except sqlite3.Error as e: logger.error(e) finally: return result def cleanTelegramApiQueue(self): try: baseGetUpdateUrl = f'https://api.telegram.org/bot{self.token}/getUpdates' response = requests.get(f'{baseGetUpdateUrl}?offset=-1') data = response.json() if data['result']: lastUpdateId = data['result'][0]['update_id'] # dapet trik dari stackoverflow # https://stackoverflow.com/questions/61976560/how-to-delete-queue-updates-in-telegram-api requests.get(f'{baseGetUpdateUrl}?offset={lastUpdateId + 1}') except requests.exceptions.ConnectionError as e: logger.error("Connection error during polling update") if __name__ == '__main__': bot = Bot() bot.getNewMessages()