• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python botapi.TelegramBot类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twx.botapi.TelegramBot的典型用法代码示例。如果您正苦于以下问题:Python TelegramBot类的具体用法?Python TelegramBot怎么用?Python TelegramBot使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了TelegramBot类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setupBot

def setupBot(apitoken):
    """
    Setup the bot
    """
    bot = TelegramBot(apitoken)
    bot.update_bot_info().wait()
    return bot
开发者ID:realraum,项目名称:r3bot-telegram,代码行数:7,代码来源:sample.py


示例2: start

	def start(self):
		self.logger.info("%s started" % self.serviceName)
		self.retrieveHomeSettings()

		if (self.botToken is None):
			self.logger.error ("The Telegram Token is not valid")
			self.stop()
		else:
			self.bot = TelegramBot(self.botToken)
			self.homeUpdateThread = Thread (target = self.homeUpdate)
			self.homeUpdateThread.start()

			# serving clients
			try:
				while self.isRunning:
					try:
					#The get_updates method returns the earliest 100 unconfirmed updates
						updates = self.bot.get_updates(offset = self.lastUpdateID + 1).wait()
						if (updates is not None):
							cont1=len(updates)
							if cont1 != 0:
								replyThread = Thread (target = self.reply, args=[updates[-1]])
								replyThread.start()
								self.lastUpdateID = updates[-1].update_id

						time.sleep(1)
					except Exception, e:
						self.logger.error('Something wrong occurred in telegram communication: %s. restoring the bot' % (e))
						self.bot = TelegramBot(self.botToken)
						
			except KeyboardInterrupt:
				self.stop()
开发者ID:eduman,项目名称:smartHome2,代码行数:32,代码来源:TelegramService.py


示例3: __init__

class NotificationSender:
    def __init__(self, bot_token, user_id):
        self.bot = TelegramBot(bot_token)
        self.user_id = user_id

    def send_message(self, message):
        self.bot.send_message(self.user_id, message).wait()
开发者ID:Cs4r,项目名称:weather_alarm,代码行数:7,代码来源:sender.py


示例4: main

def main():
    last_post_date, tgm_data = load_settings()

    rss = feedparser.parse(settings.RSS_URL)

    logging.debug("last_post_date: %s", last_post_date.isoformat())
    # find new entries in feed
    new_entries = []
    for entry in rss.entries:
        try:
            entry_published = datetime.utcfromtimestamp(
                calendar.timegm(entry.published_parsed))
            if entry_published > last_post_date:
                new_entries.append(entry)
        except AttributeError as e:
            logging.error("%s\n%s", e, entry)

    logging.info('The number of new entries: %s\nEntries: %s',
                 len(new_entries),
                 [(item.get('id'), item.get('published_parsed')) for item in
                  new_entries])

    date = datetime.now()
    if not new_entries:
        logging.info('New entries are not found')
        save_settings(date.isoformat())
        return

    # sort new entries by published date
    new_entries.sort(key=lambda item: item.published_parsed)

    # send to telegram channel
    tgm_bot = TelegramBot(settings.TGM_BOT_ACCESS_TOKEN)
    for entry in new_entries:
        try:

            logging.debug("Raw message:\n%s\n", entry.description)
            text = entry.title + '\n\n' + entry.link + '\n\n' + entry.description
            message = remove_tags(text)
            logging.debug("message:\n%s\n", message)
        except AttributeError as e:
            logging.error("%s\n%s", e, entry)
            continue

        answer = tgm_bot.send_message(settings.TGM_CHANNEL, message).wait()
        if isinstance(answer, twx.botapi.Error):
            logging.error("error code: %s\nerror description: %s\n",
                          answer.error_code,
                          answer.description)
            break
        else:
            date = max(
                datetime.utcfromtimestamp(
                    calendar.timegm(entry.published_parsed)),
                date
            )

        time.sleep(1)

    save_settings(date.isoformat())
开发者ID:PyNSK,项目名称:RSS2Telegram,代码行数:60,代码来源:bot.py


示例5: MessageServer

class MessageServer(object):
    def __init__(self):
        self.thread_map = {}
        self.bot = TelegramBot(API_TOKEN)
        self.updates = set(self.bot.get_updates().wait())

    def poll(self):
        while True:
            del_list = []
            for name in self.thread_map:
                thread = self.thread_map[name]
                if thread.finished_event.isSet():
                    del_list.append(name)
            for x in del_list:
                del self.thread_map[x]
            print("<{time}> Waiting....".format(time=datetime.now().time()))
            updates = set(self.bot.get_updates().wait())
            new_updates = updates.difference(self.updates)
            for update in new_updates:
                print("<{time}> Received new message....".format(time=datetime.now().time()))
                user = update.message.sender.id
                if user in self.thread_map:
                    print("<{time}> Dispatching message to thread....".format(time=datetime.now().time()))
                    self.thread_map[user]._add_to_queue(update.message.text)
                else:
                    print("<{time}> Creating new thread....".format(time=datetime.now().time()))
                    user_thread = Worker(user)
                    self.thread_map[user] = user_thread
                    print("<{time}> Dispatching message to thread....".format(time=datetime.now().time()))
                    user_thread._add_to_queue(update.message.text)
                self.updates.add(update)
开发者ID:Ahuge,项目名称:TelegramBot,代码行数:31,代码来源:v2.py


示例6: main

def main():
    try:
        with open('API_KEY', 'r') as f:
            api_key = f.read().replace('\n', '')
    except IOError:
        print "please write the api key in file `API_KEY`"
        exit(1)

    bot = TelegramBot(api_key)
    bot.update_bot_info().wait()
    print(bot.username)

    last_update_id = int(0)
    while True:
        try:
            updates = bot.get_updates(offset=last_update_id+1, timeout=5).wait()
            for update in updates:
                last_update_id = update.update_id
                #print(update)
                process_update(bot, update)
        except KeyboardInterrupt:
            # Allow ctrl-c.
            raise KeyboardInterrupt
        except Exception as e:
            print "---\nException: "
            print e
            traceback.print_exc()
            pass
开发者ID:auzias,项目名称:pikon,代码行数:28,代码来源:main.py


示例7: pub_to_telegram

def pub_to_telegram(text, bot_token, tg_channel):
    tgm_bot = TelegramBot(bot_token)
    answer = tgm_bot.send_message(tg_channel, text).wait()
    if isinstance(answer, twx.botapi.Error):
        print('error code: %s\nerror description: %s\n',
              answer.error_code,
              answer.description)
    else:
        print('OK')
开发者ID:pythondigest,项目名称:pythondigest,代码行数:9,代码来源:pub_digest.py


示例8: start

def start():
 bot = TelegramBot(token)
 bot.update_bot_info().wait()
 print(bot.username)
 #foto belle -33909375 
 #user_id = int(23263342)

 #result = bot.send_message(user_id, 'Salve ragazzi').wait()
 #XXX WebHook automatico non funzionante
 result = bot.set_webhook(token)
 print('*****Il risultato:' % (result))
 return "ok"
开发者ID:kmos,项目名称:cosbytbot,代码行数:12,代码来源:flaskapp.py


示例9: send_telegram

def send_telegram(pdf):
    """
    Отсылается сообщение через telegram bot. Получатель определяется по полю user.employee.sms_notify
    :param pdf:
    :return:
    """
    d = shelve.open('shelve.db')
    import_mode = d['IMPORT_MODE']
    d.close()

    logger.info('')
    logger.info('――> Telegram:')

    if import_mode:
        logger.info('····skip due import mode')
        return None

    if pdf.upload_to_ctpbureau_status:

        if pdf.ctpbureau.name == 'Admin':
            # TODO прибито гвоздями; можно сделать в настройках что-то вроде, - пропускать, если есть стоп-слова в названии. Но опять таки - что пропускать? Аплоад? Смс? Нотификации? Если все пропускать, тогда дебажить не получится
            # debugging purpose; if outputter is Admin then telegram send only to first superuser
            receivers = Employee.objects.filter(user__is_superuser=True)
        else:
            receivers = Employee.objects.filter(telegram_notify=True)

        for each in receivers:

            telegram_id = each.telegram_id
            bot = TelegramBot(settings.TELEGRAM_API_KEY)

            message = """
№{} {}
Плит: {}, Машина: {}, Вывод: {}
""".format(pdf.order, pdf.ordername, str(pdf.plates),pdf.machines[1].name, pdf.ctpbureau.name)

            # logger.debug('telegram_id={}'.format(telegram_id))
            # logger.debug('username={}'.format(each.user.username))

            responce = bot.send_message(chat_id=telegram_id, text=message).wait()

            if isinstance(responce, twx.botapi.botapi.Message):
                logger.info('··· {} receive notify'.format(responce.chat.username))
            elif isinstance(responce, twx.botapi.botapi.Error):
                logger.error(responce)
            else:
                logger.error('Critical telegram twx bug:')
                logger.error(responce)

    else:
        # если по какой-то причине у нас не софрмирован upload_to_ctpbureau_status
        logger.warning('····telegram NOT sent. Reason: failed upload')
开发者ID:swasher,项目名称:pdfupload,代码行数:52,代码来源:action.py


示例10: TelegramBroadcaster

class TelegramBroadcaster():
    """
    Setup the bot
    """
    def __init__(self):
        self.bot = TelegramBot('<BOT_KEY>')
        self.bot.update_bot_info().wait()
        self.user_id = int(<CHANNEL_KEY>) # channel goyanglib
        print(self.bot.username)

    def send(self, message):
        result = self.bot.send_message(self.user_id, message).wait()
        print(result)
开发者ID:wooyeong,项目名称:goyanglib,代码行数:13,代码来源:goyanglib.py


示例11: __init__

 def __init__(self):
     self.bot = TelegramBot(self.token)
     self.bot.get_me()
     last_updates = self.bot.get_updates(offset=0).wait()
     try:
         self.last_update_id = list(last_updates)[-1].update_id
     except IndexError:
         self.last_update_id = None
     print('last update id: {0}'.format(self.last_update_id))
开发者ID:moskmax,项目名称:nim_telegrambot,代码行数:9,代码来源:nimbot.py


示例12: __init__

    def __init__(self, token, bot_id, my_user):
        self.token = token.strip()
        self.bot_id = int(bot_id)
        self.my_user = int(my_user)

        self.bot = TelegramBot(self.token)
        self.bot.update_bot_info().wait()

        self.queue = deque()
        self.offset = 0  # Needed for the bot.get_updates method to avoid getting duplicate updates
开发者ID:ricgu8086,项目名称:vAlfred-2.0,代码行数:10,代码来源:AdapterTelegram2Channel.py


示例13: __init__

    def __init__(self, config, interpreter):
        super().__init__()
        self.__interpreter = interpreter
        self.__logger = logging.getLogger('telegram')
        logHandler = StreamHandler()
        logHandler.setLevel(logging.DEBUG)
        self.__logger.addHandler(logHandler)

        self.__bot = TelegramBot(config['Telegram']['token'])
        self.__logger.warning("Bot details: " + str(self.__bot.get_me().wait()))

        self.__updateOffset = None
开发者ID:rpoisel,项目名称:willhaben_notify,代码行数:12,代码来源:telegram.py


示例14: Telegram

class Telegram(object):
    def __init__(self, config, interpreter):
        super().__init__()
        self.__interpreter = interpreter
        self.__logger = logging.getLogger('telegram')
        logHandler = StreamHandler()
        logHandler.setLevel(logging.DEBUG)
        self.__logger.addHandler(logHandler)

        self.__bot = TelegramBot(config['Telegram']['token'])
        self.__logger.warning("Bot details: " + str(self.__bot.get_me().wait()))

        self.__updateOffset = None

    def send_msg(self, telegram_id, text):
        self.__bot.send_message(telegram_id, text).wait()

    def getUpdates(self, dbSession):
        updates = None
        try:
            if self.__updateOffset is not None:
                updates = self.__bot.get_updates(offset=self.__updateOffset + 1).wait()
            else:
                updates = self.__bot.get_updates().wait()

            if updates is None:
                raise Exception("No updates have been received due to an error")
        except:
            return

        for update in updates:
            if hasattr(update, 'message'):
                self.__logger.warning(str(update.message.sender) + ": " + update.message.text)
                self.__interpreter.interpret(dbSession, self, update.message.sender, update.message.text.split(' '))
            if hasattr(update, 'update_id'):
                self.__updateOffset = update.update_id

    def shutdown(self):
        pass
开发者ID:rpoisel,项目名称:willhaben_notify,代码行数:39,代码来源:telegram.py


示例15: main

def main():
    print '[+] Starting bot...'

    # Read the config file
    print '[+] Reading config file...'
    config = ConfigParser.ConfigParser()
    config.read([os.path.expanduser('./config')])

    # Read data
    bot_name = config.get('bot', 'name')
    bot_token = config.get('bot', 'token')
    cool_answers_raw = config.get('phrases', 'cool_answers')
    cool_answers = [ answer for answer in cool_answers_raw.split('"') if answer and answer!=',']

    # Last mssg id:
    last_id = int(load_last_id())
    print '[+] Last id: %d' % last_id

    # Configure regex
    regex = re.compile('[%s]' % re.escape(string.punctuation))

    # Create bot
    print '[+] Connecting bot...'
    bot = TelegramBot(bot_token)
    bot.update_bot_info().wait()
    print '\tBot connected! Bot name: %s' % bot.username

    while True:
        try:
            updates = bot.get_updates(offset=last_id).wait()

            for update in updates:
                id = update.message.message_id
                update_id = update.update_id
                user = update.message.sender

                chat_id = update.message.chat.id
                text = update.message.text

                if int(update_id) > last_id:
                    last_id = update_id
                    save_last_id(last_id)
                    save_log(id, update_id, chat_id, text)

                    #text = regex.sub('', text)
                    words = text.split()

                    for word in words:
                        # Process commands:
                        if 'http://' in word or 'https://' in word:
                            # Get a random answer phrase:
                            answer = random.choice(cool_answers)
                            bot.send_message(chat_id, answer)

        except (KeyboardInterrupt, SystemExit):
            print '\nkeyboardinterrupt caught (again)'
            print '\n...Program Stopped Manually!'
            raise
开发者ID:victordiaz,项目名称:telegram-bots,代码行数:58,代码来源:BotBQ.py


示例16: __init__

    def __init__(self, user,  *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        if "timeout" in kwargs:
            self.max_timeout = kwargs["timeout"]
        else:
            self.max_timeout = 600  # 10 minute timeout by default.
        self._queue = Queue()
        self.awaiting_command = True
        self.user = user
        self.bot = TelegramBot(API_TOKEN)

        self.affirmative = ["yes", "yup", "yeah", "yea"]
        self.negative = ["no", "nope", "none", "negative"]

        self.finished_event = threading.Event()
        self.last_message_time = datetime.now()
开发者ID:Ahuge,项目名称:TelegramBot,代码行数:16,代码来源:v2.py


示例17: main

def main():
    try:
        with open('API_KEY', 'r') as f:
            api_key = f.read().replace('\n', '')
    except IOError:
        print "please write the api key in file `API_KEY`"
        exit(1)

    bot = TelegramBot(api_key)
    bot.update_bot_info().wait()
    print(bot.username)

    last_update_id = int(0)
    last_fortune = (datetime.now() - timedelta(minutes=31))
    while True:
        try:
            updates = bot.get_updates(offset=last_update_id+1, timeout=5).wait()
            for update in updates:
                last_update_id = update.update_id
                print(update)
                process_update(bot, update)
            global rems
            for rem in rems:
                if rem.when < datetime.now():
                    bot.send_message(rem.chatid, "=== RAPPEL ===\n\n" +
                            unicode(rem.what))
                    print "removing reminder " + str(rem.id)
                    rems = [r for r in rems if r.id != rem.id]
                    print "rems = " + str(rems)

            odd = random.randint(0, 100)
            thirty_minutes = (datetime.now() - datetime.timedelta(minutes=5))
            if last_fortune < thirty_minutes and 50 < odd:
                last_fortune = datetime.datetime.now()
                msg = subprocess.check_output("fortune") + ' #fortune'
                bot.send_message(update.message.chat.id, msg)
        except KeyboardInterrupt:
            # Allow ctrl-c.
            raise KeyboardInterrupt
        except Exception as e:
            print "---\nException: "
            print e
            traceback.print_exc()
            pass
开发者ID:PhD-Vannes,项目名称:nitsh,代码行数:44,代码来源:main.py


示例18: __init__

    def __init__(self):
        self.bot=None
        self.conn=None
        self.c=None
        self.mem_alert = False
        self.disk_alert = False

        # Initialize DB
        self.conn = sqlite3.connect('telegram.db')
        self.c = self.conn.cursor()
        
        # Create tables
        self.c.execute('''create table if not exists Telegram (name STRING, last_name STRING, userid STRING UNIQUE)''')

        # Initialize bot
        self.bot = TelegramBot('TELEGRAM_BOT_UNIQUE_ID_GOES_HERE')
        self.bot.update_bot_info().wait()
开发者ID:sivang,项目名称:teleops,代码行数:17,代码来源:telegram.py


示例19: YoutubeBot

class YoutubeBot(object):

    api_token = ''

    def __init__(self):
        self.bot = TelegramBot(self.api_token)
        self.bot.get_me()
        last_updates = self.bot.get_updates(offset=0).wait()
        try:
            self.last_update_id = list(last_updates)[-1].update_id
        except IndexError:
            self.last_update_id = None
        print 'last update id: {}'.format(self.last_update_id)

    def process_message(self, message):

        text = message.message.text
        chat = message.message.chat
        text = text.strip().split('&')[0]
        msg = 'Could not download {}'.format(text)
        print 'Got message: \33[0;32m{}\33[0m'.format(text)
        try:
            self.bot.send_message(chat.id, 'Staring to download')
            with youtube_dl.YoutubeDL(YDL_OPTS) as ydl:
                r_code = ydl.download([text])
                if r_code == 0:
                    msg = '{} download successfully'.format(text)
        except Exception:
            pass
        self.bot.send_message(chat.id, msg)

    def run(self):
        print 'Main loop started'
        while True:
            updates = self.bot.get_updates(
                offset=self.last_update_id).wait()
            try:
                for update in updates:
                    if update.update_id > self.last_update_id:
                        self.last_update_id = update.update_id
                        self.process_message(update)
            except Exception as ex:
                print traceback.format_exc()
开发者ID:gh0st-dog,项目名称:youtube_bot,代码行数:43,代码来源:youtube_bot.py


示例20: update

def update():
 bot = TelegramBot(token)
 bot.update_bot_info().wait()

 if 'chat' in request.json['message']:
  chat = request.json['message']['chat']['id']
  if 'text' in request.json['message']:
   text = request.json['message']['text']
   if text.find('vice') != -1:
    bot.send_message(chat, vice()).wait()
   if text.find('drink') != -1:
    bot.send_photo(chat,bill()).wait()

 #Print in console for fast Debugging
 print(json.dumps(request.json))
 return "ok"
开发者ID:kmos,项目名称:cosbytbot,代码行数:16,代码来源:flaskapp.py



注:本文中的twx.botapi.TelegramBot类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python twython.Twython类代码示例发布时间:2022-05-27
下一篇:
Python config.Config类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap