• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python rfc822.parseaddr函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中rfc822.parseaddr函数的典型用法代码示例。如果您正苦于以下问题:Python parseaddr函数的具体用法?Python parseaddr怎么用?Python parseaddr使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parseaddr函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_parseaddr

 def test_parseaddr(self):
     eq = self.assertEqual
     eq(rfc822.parseaddr('<>'), ('', ''))
     eq(rfc822.parseaddr('[email protected]'), ('', '[email protected]'))
     eq(rfc822.parseaddr('[email protected] (Bea A. Person)'),
        ('Bea A. Person', '[email protected]'))
     eq(rfc822.parseaddr('Cynthia Person <[email protected]>'),
        ('Cynthia Person', '[email protected]'))
开发者ID:Bail-jw,项目名称:mediacomp-jes,代码行数:8,代码来源:test_rfc822.py


示例2: add_bcc

 def add_bcc(self, bcc):
     if isinstance(bcc, str):
         email = rfc822.parseaddr(bcc.replace(",", ""))[1]
         self.bcc.append(email)
     elif sys.version_info < (3, 0) and isinstance(bcc, unicode):
         email = rfc822.parseaddr(bcc.replace(",", ""))[1].encode("utf-8")
         self.bcc.append(email)
     elif hasattr(bcc, "__iter__"):
         for email in bcc:
             self.add_bcc(email)
开发者ID:jonwalch,项目名称:littleURL,代码行数:10,代码来源:message.py


示例3: add_cc

 def add_cc(self, cc):
     if isinstance(cc, str):
         email = rfc822.parseaddr(cc.replace(',', ''))[1]
         self.cc.append(email)
     elif sys.version_info < (3, 0) and isinstance(cc, unicode):
         email = rfc822.parseaddr(cc.replace(',', ''))[1].encode('utf-8')
         self.cc.append(email)
     elif hasattr(cc, '__iter__'):
         for email in cc:
             self.add_cc(email)
开发者ID:thinkingserious,项目名称:sendgrid-python,代码行数:10,代码来源:message.py


示例4: write_status_json

def write_status_json(component, merges, left_distro, right_distro):
    """Write out the merge status JSON dump."""
    status_file = "%s/merges/%s.json" % (ROOT, component)
    with open(status_file + ".new", "w") as status:
        # No json module available on merges.tanglu.org right now, but it's
        # not that hard to do it ourselves.
        print('[', file=status)
        cur_merge = 0
        for uploaded, priority, package, user, uploader, source, \
                base_version, left_version, right_version in merges:
            print(' {', end=' ', file=status)
            # source_package, short_description, and link are for
            # Harvest (http://daniel.holba.ch/blog/?p=838).
            print('"source_package": "%s",' % package, end=' ', file=status)
            print('"short_description": "merge %s",' % right_version,
                  end=' ', file=status)
            print('"link": "https://merges.tanglu.org/%s/%s/",' %
                  (pathhash(package), package), end=' ', file=status)
            print('"uploaded": "%s",' % uploaded, end=' ', file=status)
            print('"priority": "%s",' % priority, end=' ', file=status)
            if user is not None:
                who = user
                who = who.replace('\\', '\\\\')
                who = who.replace('"', '\\"')
                print('"user": "%s",' % who, end=' ', file=status)
                if uploader is not None:
                    (usr_name, usr_mail) = parseaddr(user)
                    (upl_name, upl_mail) = parseaddr(uploader)
                    if len(usr_name) and usr_name != upl_name:
                        u_who = uploader
                        u_who = u_who.replace('\\', '\\\\')
                        u_who = u_who.replace('"', '\\"')
                        print('"uploader": "%s",' % u_who, end=' ', file=status)
            binaries = re.split(', *', source["Binary"].replace('\n', ''))
            print('"binaries": [ %s ],' %
                  ', '.join(['"%s"' % b for b in binaries]),
                  end=' ', file=status)
            print('"base_version": "%s",' % base_version, end=' ', file=status)
            print('"left_version": "%s",' % left_version, end=' ', file=status)
            print('"right_version": "%s"' % right_version,
                  end=' ', file=status)
            cur_merge += 1
            if cur_merge < len(merges):
                print('},', file=status)
            else:
                print('}', file=status)
        print(']', file=status)

    os.rename(status_file + ".new", status_file)
开发者ID:tanglu-org,项目名称:merge-o-matic,代码行数:49,代码来源:merge-status.py


示例5: visit_field

 def visit_field(self, node):
     if isinstance(node.parent, nodes.docinfo):
         name = node[0].astext().lower().replace(" ","")
         if name == "moduleauthor":
             (ename, email) = rfc822.parseaddr(node[1].astext())
             self.docinfo["moduleauthor"] = ename
             self.docinfo["moduleauthoremail"] = email
         elif name in ("author", "sectionauthor") :
             (ename, email) = rfc822.parseaddr(node[1].astext())
             self.docinfo["sectionauthor"] = ename
             self.docinfo["sectionauthoremail"] = email
         else:
             if name == "module":
                 self.set_label_prefix(node[1].astext())
             self.docinfo[name] = node[1].astext()
         raise nodes.SkipNode
开发者ID:Distrotech,项目名称:docutils,代码行数:16,代码来源:rst2docpy.py


示例6: add_bcc

 def add_bcc(self, bcc):
     if isinstance(bcc, str):
         email = rfc822.parseaddr(bcc.replace(',', ''))[1]
         self.bcc.append(email)
     else:
         for email in bcc:
             self.add_bcc(email)
开发者ID:Dinoshauer,项目名称:sendgrid-python,代码行数:7,代码来源:message.py


示例7: get_message

def get_message(msg):
    fr = rfc822.parseaddr(msg['from'])
    to = rfc822.AddressList(msg['to']).addresslist
    cc = rfc822.AddressList(msg['cc']).addresslist
    subject = msg['subject']
    date = rfc822.parsedate(msg['date'])
    date = datetime.datetime(*date[:6])
    url = msg['Archived-At']
    if not url: 
        url = msg['X-Archived-At']
    url = url.strip("<>")
    message_id = msg['Message-ID']
    in_reply_to = msg.get('In-Reply-To', None)

    return {
        "from": fr,
        "subject": subject,
        "to": to,
        "cc": cc,
        "url": url,
        "date": date,
        "message_id": message_id,
        "in_reply_to": in_reply_to,
        "raw": msg.as_string()
    }

    return None
开发者ID:edsu,项目名称:rage14,代码行数:27,代码来源:load.py


示例8: _checkStateMX

 def _checkStateMX(self):
     nextMessages = self.queue.getWaiting()
     nextMessages.reverse()
     
     exchanges = {}
     for msg in nextMessages:
         from_, to = self.queue.getEnvelope(msg)
         name, addr = rfc822.parseaddr(to)
         parts = addr.split('@', 1)
         if len(parts) != 2:
             log.err("Illegal message destination: " + to)
             continue
         domain = parts[1]
         
         self.queue.setRelaying(msg)
         exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
         if len(exchanges) >= (self.maxConnections - len(self.managed)):
             break
     
     if self.mxcalc is None:
         self.mxcalc = MXCalculator()
     
     for (domain, msgs) in exchanges.iteritems():
         factory = self.factory(msgs, self, *self.fArgs, **self.fKwArgs)
         self.managed[factory] = map(os.path.basename, msgs)
         self.mxcalc.getMX(domain
         ).addCallback(lambda mx: str(mx.name),
         ).addCallback(self._cbExchange, self.PORT, factory
         ).addErrback(self._ebExchange, factory, domain
         )
开发者ID:galaxysd,项目名称:BitTorrent,代码行数:30,代码来源:relaymanager.py


示例9: mail_from_name

 def mail_from_name(self, do_encode=True):
     addr = self.context.from_addr
     name = parseaddr(addr)[0] or addr
     if do_encode:
         return encode(name, self.getMailingList())
     else:
         return name
开发者ID:socialplanning,项目名称:opencore-listen,代码行数:7,代码来源:mail_message_views.py


示例10: _checkStateMX

    def _checkStateMX(self):
        nextMessages = self.queue.getWaiting()
        nextMessages.reverse()

        exchanges = {}
        for msg in nextMessages:
            from_, to = self.queue.getEnvelope(msg)
            name, addr = rfc822.parseaddr(to)
            parts = addr.split('@', 1)
            if len(parts) != 2:
                log.err("Illegal message destination: " + to)
                continue
            domain = parts[1]

            self.queue.setRelaying(msg)
            exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
            if len(exchanges) >= (self.maxConnections - len(self.managed)):
                break

        if self.mxcalc is None:
            self.mxcalc = MXCalculator()

        relays = []
        for (domain, msgs) in exchanges.iteritems():
            manager = _AttemptManager(self)
            factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
            self.managed[factory] = map(os.path.basename, msgs)
            relayAttemptDeferred = manager.getCompletionDeferred()
            connectSetupDeferred = self.mxcalc.getMX(domain)
            connectSetupDeferred.addCallback(lambda mx: str(mx.name))
            connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factory)
            connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.errback(err), err)[1])
            connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
            relays.append(relayAttemptDeferred)
        return DeferredList(relays)
开发者ID:Almad,项目名称:twisted,代码行数:35,代码来源:relaymanager.py


示例11: recoverDelayed

        def recoverDelayed(self, addr, smtp):
            try:
                    filename = self.config.homedir + '/.animail/animailDelayed/' + addr
                    fp = open(filename,'r+')
                    mbox = mailbox.UnixMailbox(fp)
                    mail = mbox.next()
                    mailcount = 0
                    while mail != None:
                                header = ''.join(mail.headers)
                                body = ''
                                line = 'Ugh, This is a BUG in ANIMAIL'
                                while (line[:5] != 'From ' ) and (line != ''):
                                    line = fp.readline()
                                    body += line

                                (foo,FROM) = rfc822.parseaddr(mail['from'])
                                if self.__class__.__name__ == "Pop3Server":
                                    body = header + body

                                self.deliver_smtp(header,body,FROM,smtp)
                                mailcount += 1
                                mail = mbox.next()
                    aprint(_("%d mails recovered from %s") % (mailcount, addr), general.VIOLET)
                    fp.close()
            except:
                    print _("Error trying to delete the delayed mbox of the address %s") % addr
                    print _("The mailbox won't be deleted, if you want to read those messages")
                    print _("open it manually (.animail/animailDelayed/%s)") % addr
                    print _("The error was: ")
                    import traceback
                    traceback.print_exc(file=sys.stderr)
                    return
            os.unlink(filename)
开发者ID:juanjux,项目名称:animail,代码行数:33,代码来源:mailserver.py


示例12: migrate

    def migrate(self):
        if self.is_updated(): 
            return 'already migrated'
        
        # set the appropriate list type based on the previous settings
        # this may need to mark the appropriate interface on the mailing
        # list as well
        if self.context.moderated:
            self.context.list_type = PostModeratedListTypeDefinition
        elif self.context.closed:
            self.context.list_type = MembershipModeratedListTypeDefinition
        else:
            self.context.list_type = PublicListTypeDefinition

        # copy over the membership stuff
        annot = IAnnotations(self.context)
        listen_annot = annot.get('listen', {})
        old_subscribers = listen_annot.get('subscribers', [])

        # create the new annotations by using current adapters
        mem_list = IWriteMembershipList(self.context)
        for subscriber in old_subscribers:
            mem_list.subscribe(subscriber)

        # unsubscribe (but leave as allowed senders) those who don't 
        # receive mail
        nomail = listen_annot.get('norecvmail', [])
        for allowed_sender in nomail:
            mem_list.unsubscribe(allowed_sender)

        # copy over the moderation messages
        self.mod_post_pending_list = getAdapter(self.context, IPostPendingList, 'pending_pmod_post')
        for i in self.context.mqueue.objectIds():
            (header, body) = splitMail(self.context.mqueue[i])
            post = {'header':header, 'body':body}
            (user_name, user_email) = parseaddr(header.get('from', ''))
            self.mod_post_pending_list.add(user_email, user_name=user_name, post=post)

        # creates list managers from moderators and list owner
        managers = []
        managers.append(self.context.list_owner)
        for moderator in self.context.moderators:
            managers.append(moderator)
        self.context.managers = tuple(managers)
        convert_manager_emails_to_memberids(self.context)

        # translate archived vocabulary
        if self.context.archived == 'not archived':
            self.context.archived = 2
        elif self.context.archived == 'plain text':
            self.context.archived = 1
        elif self.context.archived == 'with attachments':
            self.context.archived = 0
        else:
            return 'error translating archive option'

        # annotate the list to say the migration completed
        self.migration_annot.append('policy_migration')

        return 'successfully migrated'
开发者ID:socialplanning,项目名称:opencore-listen,代码行数:60,代码来源:migrations.py


示例13: check_lists

    def check_lists(self, config):
        """ Check if the user is authorized to use the handler """

	# Use email part of sender
        (name, sender) = rfc822.parseaddr(self.sender)
	self.log.debug("[check_lists]: sender = %s" % self.sender)
	self.log.debug("[check_lists]: sender(email) = %s" % sender)

        # First check black list
        if config.has_option(self.section, "BLACK_LIST"):
	    self.log.debug("[check_lists]: BLACK_LIST exists")
            BL = config.get(self.section, "BLACK_LIST")
	    self.log.debug("[check_lists]: BLACK_LIST=%s" % BL)
            if sender in BL:
                return False

        # Black list is not bloquing, check white list
        if config.has_option(self.section, "WHITE_LIST"):
	    self.log.debug("[check_lists]: WHITE_LIST exists")
            WL = config.get(self.section, "WHITE_LIST")
	    self.log.debug("[check_lists]: WHITE_LIST=%s" % WL)
            return sender in WL
        
        else:
            # If no white list is provided, mbot usage is accepted
            return True
开发者ID:nah-ko,项目名称:MBot,代码行数:26,代码来源:MailHandler.py


示例14: parse_and_add

 def parse_and_add(self, to):
     super(Mail, self).add_to(to)
     name, email = rfc822.parseaddr(to.replace(',', ''))
     if email:
         self.to.append(email)
     if name:
         self.add_to_name(name)
开发者ID:jhuang314,项目名称:sendgrid-python,代码行数:7,代码来源:message.py


示例15: getAddressInfo

def getAddressInfo(obj, subscription_manager):
    if subscription_manager is not None:
        from_name, from_addr = parseaddr(obj.from_addr)
        user = lookup_member_id(from_addr, obj)
        if user is not None:
            return user, obj.from_addr
    return None, obj.from_addr
开发者ID:socialplanning,项目名称:opencore-listen,代码行数:7,代码来源:browser_utils.py


示例16: parse_person

def parse_person(val):
    """Parse a full email address into human-readable name and address."""
    # Some addresses have commas in them, as in: "Adam C. Powell, IV
    # <[email protected]>". rfc822.parseaddr seems not to
    # handle this properly, so we munge them here.
    val = val.replace(',', '')
    return rfc822.parseaddr(val)
开发者ID:vitaminmoo,项目名称:unnaturalcode,代码行数:7,代码来源:packages.py


示例17: _from

 def _from(self, fromfield):
     (name, email) = rfc822.parseaddr(fromfield)
     if name == None and email == None:
         print 'Warning!  Could not parse ' + fromfield + '.  Skipping.'
         return {}
     [(text, charset)] = header.decode_header(name)
     name = unicode(text, charset or 'ascii')
     return {'name': name, 'email': email.lower()}
开发者ID:bahmanm,项目名称:amailyser,代码行数:8,代码来源:workhorse.py


示例18: create_form_from_mail

def create_form_from_mail(config, mail, tmpfn):
    subject = mail.get("Subject", "[no subject]")
    sender = mail.get("From", "Unknown <[email protected]>")
    
    xml = None
    body = ""

    if mail.is_multipart():
        html = None
        for part in mail.walk():
            if part.get_content_maintype() == "multipart":
                continue
            elif part.get_content_type() == "d-rats/form_xml":
                xml = str(part.get_payload())
                break # A form payload trumps all
            elif part.get_content_type() == "text/plain":
                body += part.get_payload(decode=True)
            elif part.get_content_type() == "text/html":
                html = part.get_payload(decode=True)
        if not body:
            body = html
    else:
        body = mail.get_payload(decode=True)

    if not body and not xml:
        raise Exception("Unable to find a usable part")

    messageid = mail.get("Message-ID", time.strftime("%m%d%Y%H%M%S"))
    if not msgrouting.msg_lock(tmpfn):
        print "AIEE: Unable to lock incoming email message file!"

    if xml:
        f = file(tmpfn, "w")
        f.write(xml)
        f.close()
        form = formgui.FormFile(tmpfn)
        recip = form.get_recipient_string()
        if "%" in recip:
            recip, addr = recip.split("%", 1)
            recip = recip.upper()
    else:
        print "Email from %s: %s" % (sender, subject)

        recip, addr = rfc822.parseaddr(mail.get("To", "UNKNOWN"))

        efn = os.path.join(config.form_source_dir(), "email.xml")
        form = formgui.FormFile(efn)
        form.set_field_value("_auto_sender", sender)
        form.set_field_value("recipient", recip)
        form.set_field_value("subject", "EMAIL: %s" % subject)
        form.set_field_value("message", utils.filter_to_ascii(body))
        form.set_path_src(sender.strip())
        form.set_path_dst(recip.strip())
        form.set_path_mid(messageid)

    form.save_to(tmpfn)

    return form
开发者ID:coddingtonbear,项目名称:d-rats,代码行数:58,代码来源:emailgw.py


示例19: to_addresses

 def to_addresses(self):
     addresses = []
     for address in self.to_header.split(','):
         addresses.append(
                 rfc822.parseaddr(
                         address
                     )[1]
             )
     return addresses
开发者ID:gelliravi,项目名称:django-mailbox,代码行数:9,代码来源:models.py


示例20: __getitem__

 def __getitem__(self, key):
     lkey = key.lower()
     if not self.message.has_key(key):
         raise AttributeError, "%s has no such key %s" % (repr(self), key)
     if lkey in ['to', 'bcc', 'cc']:
         adrs = self.get_addresslist(lkey)
         return adrs
     elif lkey in ['from', 'envelope-to']:
         return parseaddr(self.message[lkey])
     return self.message[key]
开发者ID:albus12138,项目名称:django-mailserver,代码行数:10,代码来源:message.py



注:本文中的rfc822.parseaddr函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python rfc822.parsedate函数代码示例发布时间:2022-05-26
下一篇:
Python rfc822.mktime_tz函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap