本文整理汇总了Python中rfc822.parseaddr函数的典型用法代码示例。如果您正苦于以下问题:Python parseaddr函数的具体用法?Python parseaddr怎么用?Python parseaddr使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parseaddr函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_parseaddr
def test_parseaddr(self):
eq = self.assertEqual
eq(rfc822.parseaddr('<>'), ('', ''))
eq(rfc822.parseaddr('[email protected]'), ('', '[email protected]'))
eq(rfc822.parseaddr('[email protected] (Bea A. Person)'),
('Bea A. Person', '[email protected]'))
eq(rfc822.parseaddr('Cynthia Person <[email protected]>'),
('Cynthia Person', '[email protected]'))
开发者ID:Bail-jw,项目名称:mediacomp-jes,代码行数:8,代码来源:test_rfc822.py
示例2: add_bcc
def add_bcc(self, bcc):
if isinstance(bcc, str):
email = rfc822.parseaddr(bcc.replace(",", ""))[1]
self.bcc.append(email)
elif sys.version_info < (3, 0) and isinstance(bcc, unicode):
email = rfc822.parseaddr(bcc.replace(",", ""))[1].encode("utf-8")
self.bcc.append(email)
elif hasattr(bcc, "__iter__"):
for email in bcc:
self.add_bcc(email)
开发者ID:jonwalch,项目名称:littleURL,代码行数:10,代码来源:message.py
示例3: add_cc
def add_cc(self, cc):
if isinstance(cc, str):
email = rfc822.parseaddr(cc.replace(',', ''))[1]
self.cc.append(email)
elif sys.version_info < (3, 0) and isinstance(cc, unicode):
email = rfc822.parseaddr(cc.replace(',', ''))[1].encode('utf-8')
self.cc.append(email)
elif hasattr(cc, '__iter__'):
for email in cc:
self.add_cc(email)
开发者ID:thinkingserious,项目名称:sendgrid-python,代码行数:10,代码来源:message.py
示例4: write_status_json
def write_status_json(component, merges, left_distro, right_distro):
"""Write out the merge status JSON dump."""
status_file = "%s/merges/%s.json" % (ROOT, component)
with open(status_file + ".new", "w") as status:
# No json module available on merges.tanglu.org right now, but it's
# not that hard to do it ourselves.
print('[', file=status)
cur_merge = 0
for uploaded, priority, package, user, uploader, source, \
base_version, left_version, right_version in merges:
print(' {', end=' ', file=status)
# source_package, short_description, and link are for
# Harvest (http://daniel.holba.ch/blog/?p=838).
print('"source_package": "%s",' % package, end=' ', file=status)
print('"short_description": "merge %s",' % right_version,
end=' ', file=status)
print('"link": "https://merges.tanglu.org/%s/%s/",' %
(pathhash(package), package), end=' ', file=status)
print('"uploaded": "%s",' % uploaded, end=' ', file=status)
print('"priority": "%s",' % priority, end=' ', file=status)
if user is not None:
who = user
who = who.replace('\\', '\\\\')
who = who.replace('"', '\\"')
print('"user": "%s",' % who, end=' ', file=status)
if uploader is not None:
(usr_name, usr_mail) = parseaddr(user)
(upl_name, upl_mail) = parseaddr(uploader)
if len(usr_name) and usr_name != upl_name:
u_who = uploader
u_who = u_who.replace('\\', '\\\\')
u_who = u_who.replace('"', '\\"')
print('"uploader": "%s",' % u_who, end=' ', file=status)
binaries = re.split(', *', source["Binary"].replace('\n', ''))
print('"binaries": [ %s ],' %
', '.join(['"%s"' % b for b in binaries]),
end=' ', file=status)
print('"base_version": "%s",' % base_version, end=' ', file=status)
print('"left_version": "%s",' % left_version, end=' ', file=status)
print('"right_version": "%s"' % right_version,
end=' ', file=status)
cur_merge += 1
if cur_merge < len(merges):
print('},', file=status)
else:
print('}', file=status)
print(']', file=status)
os.rename(status_file + ".new", status_file)
开发者ID:tanglu-org,项目名称:merge-o-matic,代码行数:49,代码来源:merge-status.py
示例5: visit_field
def visit_field(self, node):
if isinstance(node.parent, nodes.docinfo):
name = node[0].astext().lower().replace(" ","")
if name == "moduleauthor":
(ename, email) = rfc822.parseaddr(node[1].astext())
self.docinfo["moduleauthor"] = ename
self.docinfo["moduleauthoremail"] = email
elif name in ("author", "sectionauthor") :
(ename, email) = rfc822.parseaddr(node[1].astext())
self.docinfo["sectionauthor"] = ename
self.docinfo["sectionauthoremail"] = email
else:
if name == "module":
self.set_label_prefix(node[1].astext())
self.docinfo[name] = node[1].astext()
raise nodes.SkipNode
开发者ID:Distrotech,项目名称:docutils,代码行数:16,代码来源:rst2docpy.py
示例6: add_bcc
def add_bcc(self, bcc):
if isinstance(bcc, str):
email = rfc822.parseaddr(bcc.replace(',', ''))[1]
self.bcc.append(email)
else:
for email in bcc:
self.add_bcc(email)
开发者ID:Dinoshauer,项目名称:sendgrid-python,代码行数:7,代码来源:message.py
示例7: get_message
def get_message(msg):
fr = rfc822.parseaddr(msg['from'])
to = rfc822.AddressList(msg['to']).addresslist
cc = rfc822.AddressList(msg['cc']).addresslist
subject = msg['subject']
date = rfc822.parsedate(msg['date'])
date = datetime.datetime(*date[:6])
url = msg['Archived-At']
if not url:
url = msg['X-Archived-At']
url = url.strip("<>")
message_id = msg['Message-ID']
in_reply_to = msg.get('In-Reply-To', None)
return {
"from": fr,
"subject": subject,
"to": to,
"cc": cc,
"url": url,
"date": date,
"message_id": message_id,
"in_reply_to": in_reply_to,
"raw": msg.as_string()
}
return None
开发者ID:edsu,项目名称:rage14,代码行数:27,代码来源:load.py
示例8: _checkStateMX
def _checkStateMX(self):
nextMessages = self.queue.getWaiting()
nextMessages.reverse()
exchanges = {}
for msg in nextMessages:
from_, to = self.queue.getEnvelope(msg)
name, addr = rfc822.parseaddr(to)
parts = addr.split('@', 1)
if len(parts) != 2:
log.err("Illegal message destination: " + to)
continue
domain = parts[1]
self.queue.setRelaying(msg)
exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
if len(exchanges) >= (self.maxConnections - len(self.managed)):
break
if self.mxcalc is None:
self.mxcalc = MXCalculator()
for (domain, msgs) in exchanges.iteritems():
factory = self.factory(msgs, self, *self.fArgs, **self.fKwArgs)
self.managed[factory] = map(os.path.basename, msgs)
self.mxcalc.getMX(domain
).addCallback(lambda mx: str(mx.name),
).addCallback(self._cbExchange, self.PORT, factory
).addErrback(self._ebExchange, factory, domain
)
开发者ID:galaxysd,项目名称:BitTorrent,代码行数:30,代码来源:relaymanager.py
示例9: mail_from_name
def mail_from_name(self, do_encode=True):
addr = self.context.from_addr
name = parseaddr(addr)[0] or addr
if do_encode:
return encode(name, self.getMailingList())
else:
return name
开发者ID:socialplanning,项目名称:opencore-listen,代码行数:7,代码来源:mail_message_views.py
示例10: _checkStateMX
def _checkStateMX(self):
nextMessages = self.queue.getWaiting()
nextMessages.reverse()
exchanges = {}
for msg in nextMessages:
from_, to = self.queue.getEnvelope(msg)
name, addr = rfc822.parseaddr(to)
parts = addr.split('@', 1)
if len(parts) != 2:
log.err("Illegal message destination: " + to)
continue
domain = parts[1]
self.queue.setRelaying(msg)
exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
if len(exchanges) >= (self.maxConnections - len(self.managed)):
break
if self.mxcalc is None:
self.mxcalc = MXCalculator()
relays = []
for (domain, msgs) in exchanges.iteritems():
manager = _AttemptManager(self)
factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
self.managed[factory] = map(os.path.basename, msgs)
relayAttemptDeferred = manager.getCompletionDeferred()
connectSetupDeferred = self.mxcalc.getMX(domain)
connectSetupDeferred.addCallback(lambda mx: str(mx.name))
connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factory)
connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.errback(err), err)[1])
connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
relays.append(relayAttemptDeferred)
return DeferredList(relays)
开发者ID:Almad,项目名称:twisted,代码行数:35,代码来源:relaymanager.py
示例11: recoverDelayed
def recoverDelayed(self, addr, smtp):
try:
filename = self.config.homedir + '/.animail/animailDelayed/' + addr
fp = open(filename,'r+')
mbox = mailbox.UnixMailbox(fp)
mail = mbox.next()
mailcount = 0
while mail != None:
header = ''.join(mail.headers)
body = ''
line = 'Ugh, This is a BUG in ANIMAIL'
while (line[:5] != 'From ' ) and (line != ''):
line = fp.readline()
body += line
(foo,FROM) = rfc822.parseaddr(mail['from'])
if self.__class__.__name__ == "Pop3Server":
body = header + body
self.deliver_smtp(header,body,FROM,smtp)
mailcount += 1
mail = mbox.next()
aprint(_("%d mails recovered from %s") % (mailcount, addr), general.VIOLET)
fp.close()
except:
print _("Error trying to delete the delayed mbox of the address %s") % addr
print _("The mailbox won't be deleted, if you want to read those messages")
print _("open it manually (.animail/animailDelayed/%s)") % addr
print _("The error was: ")
import traceback
traceback.print_exc(file=sys.stderr)
return
os.unlink(filename)
开发者ID:juanjux,项目名称:animail,代码行数:33,代码来源:mailserver.py
示例12: migrate
def migrate(self):
if self.is_updated():
return 'already migrated'
# set the appropriate list type based on the previous settings
# this may need to mark the appropriate interface on the mailing
# list as well
if self.context.moderated:
self.context.list_type = PostModeratedListTypeDefinition
elif self.context.closed:
self.context.list_type = MembershipModeratedListTypeDefinition
else:
self.context.list_type = PublicListTypeDefinition
# copy over the membership stuff
annot = IAnnotations(self.context)
listen_annot = annot.get('listen', {})
old_subscribers = listen_annot.get('subscribers', [])
# create the new annotations by using current adapters
mem_list = IWriteMembershipList(self.context)
for subscriber in old_subscribers:
mem_list.subscribe(subscriber)
# unsubscribe (but leave as allowed senders) those who don't
# receive mail
nomail = listen_annot.get('norecvmail', [])
for allowed_sender in nomail:
mem_list.unsubscribe(allowed_sender)
# copy over the moderation messages
self.mod_post_pending_list = getAdapter(self.context, IPostPendingList, 'pending_pmod_post')
for i in self.context.mqueue.objectIds():
(header, body) = splitMail(self.context.mqueue[i])
post = {'header':header, 'body':body}
(user_name, user_email) = parseaddr(header.get('from', ''))
self.mod_post_pending_list.add(user_email, user_name=user_name, post=post)
# creates list managers from moderators and list owner
managers = []
managers.append(self.context.list_owner)
for moderator in self.context.moderators:
managers.append(moderator)
self.context.managers = tuple(managers)
convert_manager_emails_to_memberids(self.context)
# translate archived vocabulary
if self.context.archived == 'not archived':
self.context.archived = 2
elif self.context.archived == 'plain text':
self.context.archived = 1
elif self.context.archived == 'with attachments':
self.context.archived = 0
else:
return 'error translating archive option'
# annotate the list to say the migration completed
self.migration_annot.append('policy_migration')
return 'successfully migrated'
开发者ID:socialplanning,项目名称:opencore-listen,代码行数:60,代码来源:migrations.py
示例13: check_lists
def check_lists(self, config):
""" Check if the user is authorized to use the handler """
# Use email part of sender
(name, sender) = rfc822.parseaddr(self.sender)
self.log.debug("[check_lists]: sender = %s" % self.sender)
self.log.debug("[check_lists]: sender(email) = %s" % sender)
# First check black list
if config.has_option(self.section, "BLACK_LIST"):
self.log.debug("[check_lists]: BLACK_LIST exists")
BL = config.get(self.section, "BLACK_LIST")
self.log.debug("[check_lists]: BLACK_LIST=%s" % BL)
if sender in BL:
return False
# Black list is not bloquing, check white list
if config.has_option(self.section, "WHITE_LIST"):
self.log.debug("[check_lists]: WHITE_LIST exists")
WL = config.get(self.section, "WHITE_LIST")
self.log.debug("[check_lists]: WHITE_LIST=%s" % WL)
return sender in WL
else:
# If no white list is provided, mbot usage is accepted
return True
开发者ID:nah-ko,项目名称:MBot,代码行数:26,代码来源:MailHandler.py
示例14: parse_and_add
def parse_and_add(self, to):
super(Mail, self).add_to(to)
name, email = rfc822.parseaddr(to.replace(',', ''))
if email:
self.to.append(email)
if name:
self.add_to_name(name)
开发者ID:jhuang314,项目名称:sendgrid-python,代码行数:7,代码来源:message.py
示例15: getAddressInfo
def getAddressInfo(obj, subscription_manager):
if subscription_manager is not None:
from_name, from_addr = parseaddr(obj.from_addr)
user = lookup_member_id(from_addr, obj)
if user is not None:
return user, obj.from_addr
return None, obj.from_addr
开发者ID:socialplanning,项目名称:opencore-listen,代码行数:7,代码来源:browser_utils.py
示例16: parse_person
def parse_person(val):
"""Parse a full email address into human-readable name and address."""
# Some addresses have commas in them, as in: "Adam C. Powell, IV
# <[email protected]>". rfc822.parseaddr seems not to
# handle this properly, so we munge them here.
val = val.replace(',', '')
return rfc822.parseaddr(val)
开发者ID:vitaminmoo,项目名称:unnaturalcode,代码行数:7,代码来源:packages.py
示例17: _from
def _from(self, fromfield):
(name, email) = rfc822.parseaddr(fromfield)
if name == None and email == None:
print 'Warning! Could not parse ' + fromfield + '. Skipping.'
return {}
[(text, charset)] = header.decode_header(name)
name = unicode(text, charset or 'ascii')
return {'name': name, 'email': email.lower()}
开发者ID:bahmanm,项目名称:amailyser,代码行数:8,代码来源:workhorse.py
示例18: create_form_from_mail
def create_form_from_mail(config, mail, tmpfn):
subject = mail.get("Subject", "[no subject]")
sender = mail.get("From", "Unknown <[email protected]>")
xml = None
body = ""
if mail.is_multipart():
html = None
for part in mail.walk():
if part.get_content_maintype() == "multipart":
continue
elif part.get_content_type() == "d-rats/form_xml":
xml = str(part.get_payload())
break # A form payload trumps all
elif part.get_content_type() == "text/plain":
body += part.get_payload(decode=True)
elif part.get_content_type() == "text/html":
html = part.get_payload(decode=True)
if not body:
body = html
else:
body = mail.get_payload(decode=True)
if not body and not xml:
raise Exception("Unable to find a usable part")
messageid = mail.get("Message-ID", time.strftime("%m%d%Y%H%M%S"))
if not msgrouting.msg_lock(tmpfn):
print "AIEE: Unable to lock incoming email message file!"
if xml:
f = file(tmpfn, "w")
f.write(xml)
f.close()
form = formgui.FormFile(tmpfn)
recip = form.get_recipient_string()
if "%" in recip:
recip, addr = recip.split("%", 1)
recip = recip.upper()
else:
print "Email from %s: %s" % (sender, subject)
recip, addr = rfc822.parseaddr(mail.get("To", "UNKNOWN"))
efn = os.path.join(config.form_source_dir(), "email.xml")
form = formgui.FormFile(efn)
form.set_field_value("_auto_sender", sender)
form.set_field_value("recipient", recip)
form.set_field_value("subject", "EMAIL: %s" % subject)
form.set_field_value("message", utils.filter_to_ascii(body))
form.set_path_src(sender.strip())
form.set_path_dst(recip.strip())
form.set_path_mid(messageid)
form.save_to(tmpfn)
return form
开发者ID:coddingtonbear,项目名称:d-rats,代码行数:58,代码来源:emailgw.py
示例19: to_addresses
def to_addresses(self):
addresses = []
for address in self.to_header.split(','):
addresses.append(
rfc822.parseaddr(
address
)[1]
)
return addresses
开发者ID:gelliravi,项目名称:django-mailbox,代码行数:9,代码来源:models.py
示例20: __getitem__
def __getitem__(self, key):
lkey = key.lower()
if not self.message.has_key(key):
raise AttributeError, "%s has no such key %s" % (repr(self), key)
if lkey in ['to', 'bcc', 'cc']:
adrs = self.get_addresslist(lkey)
return adrs
elif lkey in ['from', 'envelope-to']:
return parseaddr(self.message[lkey])
return self.message[key]
开发者ID:albus12138,项目名称:django-mailserver,代码行数:10,代码来源:message.py
注:本文中的rfc822.parseaddr函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论