本文整理汇总了Python中x84.bbs.DBProxy类的典型用法代码示例。如果您正苦于以下问题:Python DBProxy类的具体用法?Python DBProxy怎么用?Python DBProxy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DBProxy类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: login
def login(session, user):
"""
Assign ``user`` to ``session`` and return time of last call.
performs various saves and lookups of call records
"""
session.user = user
# assign timeout preference
timeout = session.user.get('timeout', None)
if timeout is not None:
session.send_event('set-timeout', timeout)
# update call records
user.calls += 1
user.lastcall = time.time()
# save user record
if user.handle != u'anonymous':
user.save()
# update 'lastcalls' database
lc_db = DBProxy('lastcalls')
with lc_db:
previous_call, _, _ = lc_db.get(user.handle, (0, 0, 0,))
lc_db[user.handle] = (user.lastcall, user.calls, user.location)
return previous_call
开发者ID:ztaylor,项目名称:x84,代码行数:28,代码来源:top.py
示例2: refresh_automsg
def refresh_automsg(idx):
""" Refresh automsg database, display automsg of idx, return idx. """
session.flush_event('automsg')
autodb = DBProxy('automsg')
automsgs = sorted(autodb.values()) if len(autodb) else db_firstrecord
dblen = len(automsgs)
# bounds check
if idx < 0:
idx = dblen - 1
elif idx > dblen - 1:
idx = 0
tm_ago, handle, msg = automsgs[idx]
asc_ago = u'%s ago' % (timeago(time.time() - tm_ago))
disp = (u''.join(('\r\n\r\n',
term.bold(handle.rjust(max_user)),
term.bold_blue(u'/'),
term.blue(u'%*d' % (len('%d' % (dblen,)), idx,)),
term.bold_blue(u':'),
term.blue_reverse(msg.ljust(automsg_len)),
term.bold(u'\\'),
term.blue(asc_ago),)))
echo(u''.join((
u'\r\n\r\n',
Ansi(disp).wrap(term.width),
)))
return idx
开发者ID:jonny290,项目名称:yos-x84,代码行数:26,代码来源:logoff.py
示例3: prompt_tags
def prompt_tags(msg):
""" Prompt for and return tags wished for message. """
# pylint: disable=R0914,W0603
# Too many local variables
# Using the global statement
from x84.bbs import DBProxy, echo, getterminal, getsession
from x84.bbs import Ansi, LineEditor
session, term = getsession(), getterminal()
tagdb = DBProxy('tags')
msg_onlymods = (u"\r\nONlY MEMbERS Of thE '%s' OR '%s' "
"GROUP MAY CREAtE NEW tAGS." % (
term.bold_yellow('sysop'),
term.bold_blue('moderator'),))
msg_invalidtag = u"\r\n'%s' is not a valid tag."
prompt_tags1 = u"ENtER %s, COMMA-dEliMitEd. " % (
term.bold_red('TAG(s)'),)
prompt_tags2 = u"OR '/list', %s:quit\r\n : " % (
term.bold_yellow_underline('Escape'),)
while True:
# Accept user input for multiple 'tag's, or /list command
echo(u'\r\n\r\n')
echo(prompt_tags1)
echo(prompt_tags2)
width = term.width - 6
sel_tags = u', '.join(msg.tags)
inp_tags = LineEditor(width, sel_tags).read()
if inp_tags is not None and 0 == len(inp_tags.strip()):
# no tags must be (private ..)
msg.tags = set()
return True
if inp_tags is None or inp_tags.strip().lower() == '/quit':
return False
elif inp_tags.strip().lower() == '/list':
# list all available tags, and number of messages
echo(u'\r\n\r\nTags: \r\n')
all_tags = sorted(tagdb.items())
if 0 == len(all_tags):
echo(u'None !'.center(term.width / 2))
else:
echo(Ansi(u', '.join(([u'%s(%d)' % (_key, len(_value),)
for (_key, _value) in all_tags]))
).wrap(term.width - 2))
continue
echo(u'\r\n')
# search input as valid tag(s)
tags = set([inp.strip().lower() for inp in inp_tags.split(',')])
err = False
for tag in tags.copy():
if not tag in tagdb and not (
'sysop' in session.user.groups or
'moderator' in session.user.groups):
tags.remove(tag)
echo(msg_invalidtag % (term.bold_red(tag),))
err = True
if err:
echo(msg_onlymods)
continue
msg.tags = tags
return True
开发者ID:quastdog,项目名称:x84,代码行数:60,代码来源:writemsg.py
示例4: publish_network_messages
def publish_network_messages(net):
" Push messages to network. "
from x84.bbs import DBProxy
from x84.bbs.msgbase import format_origin_line, MSGDB
log = logging.getLogger(__name__)
log.debug(u'[{net[name]}] publishing new messages.'.format(net=net))
queuedb = DBProxy('{0}queues'.format(net['name']), use_session=False)
transdb = DBProxy('{0}trans'.format(net['name']), use_session=False)
msgdb = DBProxy(MSGDB, use_session=False)
# publish each message
for msg_id in sorted(queuedb.keys(),
cmp=lambda x, y: cmp(int(x), int(y))):
if msg_id not in msgdb:
log.warn('{net[name]} No such message (msg_id={msg_id})'
.format(net=net, msg_id=msg_id))
del queuedb[msg_id]
continue
msg = msgdb[msg_id]
trans_parent = None
if msg.parent is not None:
matches = [key for key, data in transdb.items()
if int(data) == msg.parent]
if len(matches) > 0:
trans_parent = matches[0]
else:
log.warn('{net[name]} Parent ID {msg.parent} '
'not in translation-DB (msg_id={msg_id})'
.format(net=net, msg=msg, msg_id=msg_id))
trans_id = push_rest(net=net, msg=msg, parent=trans_parent)
if trans_id is False:
log.error('{net[name]} Message not posted (msg_id={msg_id})'
.format(net=net['name'], msg_id=msg_id))
continue
if trans_id in transdb.keys():
log.error('{net[name]} trans_id={trans_id} conflicts with '
'(msg_id={msg_id})'
.format(net=net, trans_id=trans_id, msg_id=msg_id))
with queuedb:
del queuedb[msg_id]
continue
# transform, and possibly duplicate(?) message ..
with transdb, msgdb, queuedb:
transdb[trans_id] = msg_id
msg.body = u''.join((msg.body, format_origin_line()))
msgdb[msg_id] = msg
del queuedb[msg_id]
log.info('{net[name]} Published (msg_id={msg_id}) => {trans_id}'
.format(net=net, msg_id=msg_id, trans_id=trans_id))
开发者ID:hick,项目名称:x84,代码行数:58,代码来源:msgpoll.py
示例5: add_comment
def add_comment(key):
""" Prompt user to add a comment about a bbs. """
# pylint: disable=R0914
# Too many local variables.
from x84.bbs import getsession, getterminal, echo, DBProxy, LineEditor
from x84.bbs import getch
session, term = getsession(), getterminal()
prompt_comment = u'\r\n\r\nWhAt YOU GOt tO SAY? '
prompt_chg = u'\r\n\r\nChANGE EXiStiNG ? [yn] '
echo(term.move(term.height, 0))
echo(prompt_comment)
comment = LineEditor(max(10, term.width - len(prompt_comment) - 5)).read()
if comment is None or 0 == len(comment.strip()):
return
new_entry = (session.handle, comment)
comments = DBProxy('bbslist', 'comments')
comments.acquire()
existing = comments[key]
if session.handle in (_nick for (_nick, _cmt) in comments[key]):
# change existing comment,
echo(prompt_chg)
if getch() not in (u'y', u'Y'):
comments.release()
return
# re-define list without existing entry, + new entry
comments[key] = [(_enick, _ecmt) for (_enick, _ecmt) in existing
if session.handle != _enick] + [new_entry]
comments.release()
return
# re-define as existing list + new entry
comments[key] = existing + [new_entry]
comments.release()
开发者ID:jonny290,项目名称:yos-x84,代码行数:32,代码来源:bbslist.py
示例6: x84net_requeue
def x84net_requeue():
# a message failed to queue for delivery, but hellbeard
# really wanted to see em, so re-queue.
from x84.bbs import DBProxy, echo
from pprint import pformat
queuedb = DBProxy('x84netqueues')
with queuedb:
queuedb['264'] = 1
echo('-')
echo(pformat(queuedb.items()))
echo('-')
开发者ID:rostob,项目名称:x84,代码行数:11,代码来源:debug.py
示例7: maybe_expunge_records
def maybe_expunge_records():
""" Check ceiling of database keys; trim-to MAX_HISTORY. """
udb = DBProxy('oneliner')
expunged = 0
with udb:
if len(udb) > MAX_HISTORY + 10:
contents = DBProxy('oneliner').copy()
_sorted = sorted(
((value, key) for (key, value) in contents.items()),
key=lambda _valkey: keysort_by_datetime(_valkey[0]))
for expunged, (_, key) in enumerate(
_sorted[len(udb) - MAX_HISTORY:]):
del udb[key]
if expunged:
log = logging.getLogger(__name__)
log.info('expunged %d records from database', expunged)
开发者ID:rostob,项目名称:x84,代码行数:16,代码来源:ol.py
示例8: add_oneline
def add_oneline(session, message):
""" Add a oneliner to the local database. """
udb = DBProxy('oneliner')
with udb:
key = max([int(key) for key in udb.keys()] or [0]) + 1
udb[key] = {
'oneliner': message,
'alias': getsession().user.handle,
'bbsname': get_ini('system', 'bbsname'),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
}
maybe_expunge_records()
# tell everybody a new oneliner was posted, including our
# -- allows it to work something like a chatroom.
session.send_event('global', ('oneliner', True))
开发者ID:rostob,项目名称:x84,代码行数:16,代码来源:ol.py
示例9: tygerofdantye_fix
def tygerofdantye_fix():
""" This user was too long! """
from x84.bbs import DBProxy
user = DBProxy('userbase')['tygerofdanyte']
user.delete()
user.handle = u'tygerdanyte'
user.save()
开发者ID:rostob,项目名称:x84,代码行数:7,代码来源:debug.py
示例10: lc_retrieve
def lc_retrieve():
"""
Returns tuple of ([nicknames,] u'text'), where 'text' describes in Ansi
color the last callers to the system, and 'nicknames' is simply a list
of last callers (for lightbar selection key).
"""
# pylint: disable=R0914
# Too many local variables
from x84.bbs import get_user, ini, timeago, getterminal
from x84.bbs import DBProxy
import time
term = getterminal()
udb = DBProxy('lastcalls')
# re-order by time called; unfortunate ..; note that sqlite
# encodes unicode as utf-8; but doesn't decode it on retrieval,
# of dict keys; possible upstream patching opportunity here,
sortdb = {}
for ((handle), (tm_lc, _nc, origin)) in (udb.items()):
while tm_lc in sortdb:
tm_lc += 0.1
sortdb[tm_lc] = [handle.decode('utf-8'), _nc, origin]
padd_handle = (ini.CFG.getint('nua', 'max_user') + 2)
padd_origin = (ini.CFG.getint('nua', 'max_location') + 2)
rstr = u''
nicks = []
for tm_lc, (handle, _nc, origin) in (reversed(sorted(sortdb.items()))):
try:
is_sysop = 'sysop' in get_user(handle).groups
except KeyError:
# anonymous/deleted accts,
is_sysop = False
rstr += (term.bold_red(u'@') if is_sysop else u''
) + (term.ljust(handle,
(padd_handle - (2 if is_sysop else 1))))
rstr += term.red(origin.ljust(padd_origin))
rstr += timeago(time.time() - tm_lc)
rstr += u'\n'
nicks.append(handle)
return (nicks, rstr.rstrip())
开发者ID:jonny290,项目名称:yos-x84,代码行数:40,代码来源:lc.py
示例11: chk_thread
def chk_thread(thread):
"""
check if bbs-scene.org thread finished, if so, farm
its data and send updates via event 'oneliner_update' if there
are any.
"""
from x84.bbs import getsession, DBProxy
import logging
log = logging.getLogger(__name__)
session = getsession()
if thread is not None and not thread.is_alive():
udb = DBProxy('oneliner')
udbkeys = udb.keys()
nlc = 0
for key, value in thread.content:
if key not in udbkeys:
udb[key] = value
nlc += 1
if nlc:
log.debug('%d new entries', nlc)
session.buffer_event('oneliner_update', True)
else:
log.debug('no new %s entries'.format(thread.ident))
return True
开发者ID:hick,项目名称:x84,代码行数:24,代码来源:ol.py
示例12: rate_bbs
def rate_bbs(key):
""" Prompt user to rate a bbs. """
# pylint: disable=R0914
# Too many local variables
from x84.bbs import getsession, getterminal, echo, LineEditor, DBProxy
from x84.bbs import getch
session, term = getsession(), getterminal()
prompt_rating = u'\r\n\r\nRAtE 0.0 - 4.0: '
prompt_chg = u'\r\n\r\nChANGE EXiStiNG ? [yn] '
msg_invalid = u'\r\n\r\niNVAlid ENtRY.\r\n'
echo(term.move(term.height, 0) + '\r\n')
echo(prompt_rating)
s_rating = LineEditor(3).read()
if s_rating is None or 0 == len(s_rating.strip()):
return
try:
f_rating = float(s_rating)
except ValueError:
echo(msg_invalid)
return
if f_rating < 0 or f_rating > 4:
echo(msg_invalid)
return
entry = (session.handle, f_rating)
ratings = DBProxy('bbslist', 'ratings')
ratings.acquire()
if session.handle in (_handle for (_handle, _rating) in ratings[key]):
echo(prompt_chg)
if getch() not in (u'y', u'Y'):
ratings.release()
return
# re-define list without existing entry, + new entry
ratings[key] = [(__handle, __rating)
for (__handle, __rating) in ratings[key]
if session.handle != __handle] + [entry]
ratings.release()
return
# re-define as existing list + new entry
ratings[key] = ratings[key] + [entry]
ratings.release()
开发者ID:jonny290,项目名称:yos-x84,代码行数:40,代码来源:bbslist.py
示例13: add_oneline
def add_oneline(msg):
"""
Add a oneliner to the local database.
"""
import time
from x84.bbs import getsession, DBProxy, ini
session = getsession()
udb = DBProxy('oneliner')
udb.acquire()
udb[max([int(key) for key in udb.keys()] or [0]) + 1] = {
'oneliner': msg,
'alias': getsession().handle,
'bbsname': ini.CFG.get('system', 'bbsname'),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
}
udb.release()
session.buffer_event('oneliner_update', True)
session.send_event('global', ('oneliner_update', True))
开发者ID:hick,项目名称:x84,代码行数:18,代码来源:ol.py
示例14: prompt_tags
def prompt_tags(tags):
""" Prompt for and return valid tags from TAGDB. """
# pylint: disable=R0914,W0603
# Too many local variables
# Using the global statement
from x84.bbs import DBProxy, echo, getterminal, getsession
from x84.bbs import Ansi, LineEditor, getch
session, term = getsession(), getterminal()
tagdb = DBProxy('tags')
global FILTER_PRIVATE
while True:
# Accept user input for a 'search tag', or /list command
#
echo(u"\r\n\r\nENtER SEARCh %s, COMMA-dEliMitEd. " % (
term.red('TAG(s)'),))
echo(u"OR '/list', %s%s\r\n : " % (
(term.yellow_underline('^x') + u':autoscan '
if session.user.get('autoscan', False) else u''),
term.yellow_underline('^a') + u':ll msgs ' +
term.yellow_underline('Esc') + u':quit',))
width = term.width - 6
sel_tags = u', '.join(tags)
while len(Ansi(sel_tags)) >= (width - 8):
tags = tags[:-1]
sel_tags = u', '.join(tags)
lne = LineEditor(width, sel_tags)
echo(lne.refresh())
while not lne.carriage_returned:
inp = getch()
if inp in (unichr(27), term.KEY_EXIT):
return None
if inp in (unichr(24),): # ^A:all
return set()
if inp in (unichr(1),): # ^X:autoscan
return session.user.get('autoscan', set())
else:
echo(lne.process_keystroke(inp))
if lne.carriage_returned:
inp_tags = lne.content
if (inp_tags is None or 0 == len(inp_tags)
or inp_tags.strip().lower() == '/quit'):
return set()
elif inp_tags.strip().lower() == '/list':
# list all available tags, and number of messages
echo(term.normal)
echo(u'\r\n\r\nTags: \r\n')
all_tags = sorted(tagdb.items())
if 0 == len(all_tags):
echo(u'None !'.center(term.width / 2))
else:
echo(Ansi(u', '.join(([u'%s(%s)' % (
term.red(tag),
term.yellow(str(len(msgs))),)
for (tag, msgs) in all_tags]))).wrap(term.width - 2))
continue
elif (inp_tags.strip().lower() == '/nofilter'
and 'sysop' in session.user.groups):
# disable filtering private messages
FILTER_PRIVATE = False
continue
echo(u'\r\n')
# search input as valid tag(s)
tags = set([_tag.strip().lower() for _tag in inp_tags.split(',')])
for tag in tags.copy():
if not tag in tagdb:
tags.remove(tag)
echo(u"\r\nNO MESSAGES With tAG '%s' fOUNd." % (
term.red(tag),))
return tags
开发者ID:jonny290,项目名称:yos-x84,代码行数:70,代码来源:yosindex.py
示例15: main
def main():
""" Main procedure. """
# pylint: disable=R0914,R0912
# Too many local variables
# Too many branches
from x84.bbs import DBProxy, getsession, getterminal, echo
from x84.bbs import ini, LineEditor, timeago, Ansi, showcp437
from x84.bbs import disconnect, getch
import time
import os
session, term = getsession(), getterminal()
session.activity = 'logging off'
handle = session.handle if (
session.handle is not None
) else 'anonymous'
max_user = ini.CFG.getint('nua', 'max_user')
prompt_msg = u'[spnG]: ' if session.user.get('expert', False) else (
u'%s:AY SOMEthiNG %s:REViOUS %s:EXt %s:Et thE fUCk Off !\b' % (
term.bold_blue_underline(u's'), term.blue_underline(u'p'),
term.blue_underline(u'n'), term.red_underline(u'Escape/g'),))
prompt_say = u''.join((term.bold_blue(handle),
term.blue(u' SAYS WhAt'), term.bold(': '),))
boards = (('1984.ws', 'x/84 dEfAUlt bOARd', 'dingo',),
('htc.zapto.org', 'Haunting the Chapel', 'Mercyful',),
('pharcyde.ath.cx', 'Pharcyde BBS', 'Access Denied',),
('bloodisland.ph4.se', 'Blood Island', 'xzip',),
('ssl.archaicbinary.net', 'Archaic Binary', 'Wayne Smith',),
('bbs.godta.com', 'godta', 'sk-5',)
,)
board_fmt = u'%25s %-30s %-15s\r\n'
goodbye_msg = u''.join((
term.move(term.height, 0),
u'\r\n' * 10,
u'tRY ANOthER fiNE bOARd', term.bold(u':'), u'\r\n\r\n',
board_fmt % (
term.underline('host'.rjust(25)),
term.underline('board'.ljust(30)),
term.underline('sysop'.ljust(15)),),
u'\r\n'.join([board_fmt % (
term.bold(host.rjust(25)),
term.reverse(board.center(30)),
term.bold_underline(sysop),)
for (host, board, sysop) in boards]),
u'\r\n\r\n',
term.bold(
u'back to the mundane world...'),
u'\r\n',))
commit_msg = term.bold_blue(
u'-- ! thANk YOU fOR YOUR CONtRibUtiON, bROthER ! --')
write_msg = term.red_reverse(
u'bURNiNG tO ROM, PlEASE WAiT ...')
db_firstrecord = ((time.time() - 1984,
u'B. b.', u'bEhAVE YOURSElVES ...'),)
automsg_len = 40
artfile = os.path.join(os.path.dirname(__file__), 'art', '1984.asc')
def refresh_prompt(msg):
""" Refresh automsg prompt using string msg. """
echo(u''.join((u'\r\n\r\n', term.clear_eol, msg)))
def refresh_automsg(idx):
""" Refresh automsg database, display automsg of idx, return idx. """
session.flush_event('automsg')
autodb = DBProxy('automsg')
automsgs = sorted(autodb.values()) if len(autodb) else db_firstrecord
dblen = len(automsgs)
# bounds check
if idx < 0:
idx = dblen - 1
elif idx > dblen - 1:
idx = 0
tm_ago, handle, msg = automsgs[idx]
asc_ago = u'%s ago' % (timeago(time.time() - tm_ago))
disp = (u''.join(('\r\n\r\n',
term.bold(handle.rjust(max_user)),
term.bold_blue(u'/'),
term.blue(u'%*d' % (len('%d' % (dblen,)), idx,)),
term.bold_blue(u':'),
term.blue_reverse(msg.ljust(automsg_len)),
term.bold(u'\\'),
term.blue(asc_ago),)))
echo(u''.join((
u'\r\n\r\n',
Ansi(disp).wrap(term.width),
)))
return idx
def refresh_all(idx=None):
"""
refresh screen, database, and return database index
"""
echo(u''.join((u'\r\n\r\n', term.clear_eol,)))
for line in showcp437(artfile):
echo(line)
idx = refresh_automsg(-1 if idx is None else idx)
refresh_prompt(prompt_msg)
return idx
idx = refresh_all()
while True:
#.........这里部分代码省略.........
开发者ID:jonny290,项目名称:yos-x84,代码行数:101,代码来源:logoff.py
示例16: main
def main(autoscan_tags=None):
""" Main procedure. """
# pylint: disable=W0603,R0912
# Using the global statement
# Too many branches
from x84.bbs import getsession, getterminal, echo, getch
from x84.bbs import list_msgs
session, term = getsession(), getterminal()
session.activity = 'autoscan msgs'
### 尝试
session.log.info("Hick3")
### 获取所有 tag
tagdb = DBProxy('tags')
all_tags = sorted(tagdb.items())
session.log.info(all_tags)
### 尝试直接调出显示的 message ,第二个参数应该是标识为未读的
msg = new = [1, 2, 3, 4,5 ,6, 7, 8,9]
read_messages(msg, new)
return
### 首先是显示提示输入的 tag 的标签
echo(banner())
global ALREADY_READ, SEARCH_TAGS, DELETED
if autoscan_tags is not None:
SEARCH_TAGS = autoscan_tags
echo(u''.join((
term.bold_black('[ '),
term.yellow('AUtOSCAN'),
term.bold_black(' ]'), u'\r\n')))
### 默认就往这里了, 提示输入 tag
else:
# 默认 tag 为 public
SEARCH_TAGS = set(['hick3'])
# also throw in user groups, maybe the top 3 .. ?
SEARCH_TAGS.update(session.user.groups)
SEARCH_TAGS = prompt_tags(SEARCH_TAGS)
# user escape
if SEARCH_TAGS is None:
return
echo(u'\r\n\r\n%s%s ' % (
term.bold_yellow('SCANNiNG'),
term.bold_black(':'),))
echo(u','.join([term.red(tag) for tag in SEARCH_TAGS]
if 0 != len(SEARCH_TAGS) else ['<All>', ]))
### 直到有选择 tags , 保存到 session.user 中
if (SEARCH_TAGS != session.user.get('autoscan', None)):
echo(u'\r\n\r\nSave tag list as autoscan on login [yn] ?\b\b')
while True:
inp = getch()
if inp in (u'q', 'Q', unichr(27), u'n', u'N'):
break
elif inp in (u'y', u'Y'):
session.user['autoscan'] = SEARCH_TAGS
break
# retrieve all matching messages,: list_msgs 根据 tags 获得所有记录,看情形这个信息量大了有问题哈
all_msgs = list_msgs(SEARCH_TAGS)
echo(u'\r\n\r\n%s messages.' % (term.yellow_reverse(str(len(all_msgs),))))
if 0 == len(all_msgs):
getch(0.5)
return
# filter messages public/private/group-tag/new
### 分 tag 和是否未读,删除等统计
ALREADY_READ = session.user.get('readmsgs', set())
DELETED = session.user.get('trash', set())
msgs, new = msg_filter(all_msgs)
if 0 == len(msgs) and 0 == len(new):
getch(0.5)
return
# prompt read 'a'll, 'n'ew, or 'q'uit
echo(u'\r\n REAd [%s]ll %d%s message%s [qa%s] ?\b\b' % (
term.yellow_underline(u'a'),
len(msgs), (
u' or %d [%s]EW ' % (
len(new), term.yellow_underline(u'n'),)
if new else u''),
u's' if 1 != len(msgs) else u'',
u'n' if new else u'',))
while True:
inp = getch()
if inp in (u'q', 'Q', unichr(27)):
return
elif inp in (u'n', u'N') and len(new):
# read only new messages
msgs = new
break
elif inp in (u'a', u'A'):
break
# 根君上面的用户选择,读取消息, 某次记录 log msgs 和 new 都是帖子 id 的 set
# read target messages
# session.log.info(msgs)
# session.log.info(new)
read_messages(msgs, new)
开发者ID:hick,项目名称:x84,代码行数:99,代码来源:feeds.py
示例17: do_merge_shroo_ms
def do_merge_shroo_ms(new_content):
""" Add oneliners from shroo-ms to local database. """
udb = DBProxy('oneliner')
with udb:
udb.update(new_content)
maybe_expunge_records()
开发者ID:rostob,项目名称:x84,代码行数:6,代码来源:ol.py
示例18: prompt_tags
def prompt_tags(msg):
""" Prompt for and return tags wished for message. """
# pylint: disable=R0914,W0603
# Too many local variables
# Using the global statement
from x84.bbs import DBProxy, echo, getterminal, getsession
from x84.bbs import LineEditor, ini
session, term = getsession(), getterminal()
tagdb = DBProxy('tags')
# version 1.0.9 introduced new ini option; set defaults for
# those missing it from 1.0.8 upgrades.
import ConfigParser
try:
moderated_tags = ini.CFG.getboolean('msg', 'moderated_tags')
except ConfigParser.NoOptionError:
moderated_tags = False
try:
moderated_groups = set(ini.CFG.get('msg', 'tag_moderator_groups'
).split())
except ConfigParser.NoOptionError:
moderated_groups = ('sysop', 'moderator',)
msg_onlymods = (u"\r\nONlY MEMbERS Of GROUPS %s MAY CREAtE NEW tAGS." % (
", ".join(["'%s'".format(term.bold_yellow(grp)
for grp in moderated_groups)])))
msg_invalidtag = u"\r\n'%s' is not a valid tag."
prompt_tags1 = u"ENtER %s, COMMA-dEliMitEd. " % (term.bold_red('TAG(s)'),)
prompt_tags2 = u"OR '/list', %s:quit\r\n : " % (
term.bold_yellow_underline('Escape'),)
while True:
# Accept user input for multiple 'tag's, or /list command
echo(u'\r\n\r\n')
echo(prompt_tags1)
echo(prompt_tags2)
width = term.width - 6
sel_tags = u', '.join(msg.tags)
inp_tags = LineEditor(width, sel_tags).read()
if inp_tags is not None and 0 == len(inp_tags.strip()):
# no tags must be (private ..)
msg.tags = set()
return True
if inp_tags is None or inp_tags.strip().lower() == '/quit':
return False
elif inp_tags.strip().lower() == '/list':
# list all available tags, and number of messages
echo(u'\r\n\r\nTags: \r\n')
all_tags = sorted(tagdb.items())
if 0 == len(all_tags):
echo(u'None !'.center(term.width / 2))
else:
echo(u', '.join((term.wrap([u'%s(%d)' % (_key, len(_value),)
for (_key, _value) in all_tags]))
), term.width - 2)
continue
echo(u'\r\n')
# search input as valid tag(s)
tags = set([inp.strip().lower() for inp in inp_tags.split(',')])
# if the tag is new, and the user's group is not in
# tag_moderator_groups, then dissallow such tag if
# 'moderated_tags = yes' in ini cfg
if moderated_tags:
err = False
for tag in tags.copy():
if not tag in tagdb and not (
session.users.groups & moderated_groups):
tags.remove(tag)
echo(msg_invalidtag % (term.bold_red(tag),))
err = True
if err:
echo(msg_onlymods)
continue
msg.tags = tags
return True
开发者ID:signalpillar,项目名称:x84,代码行数:74,代码来源:writemsg.py
示例19: poll_network_for_messages
def poll_network_for_messages(net):
" pull for new messages of network, storing locally. "
from x84.bbs import Msg, DBProxy
from x84.bbs.msgbase import to_localtime
log = logging.getLogger(__name__)
log.debug(u'[{net[name]}] polling for new messages.'.format(net=net))
try:
last_msg_id = get_last_msg_id(net['last_file'])
except (OSError, IOError) as err:
log.error('[{net[name]}] skipping network: {err}'
.format(net=net, err=err))
return
msgs = pull_rest(net=net, last_msg_id=last_msg_id)
if msgs is not False:
log.info('{net[name]} Retrieved {num} messages'
.format(net=net, num=len(msgs)))
else:
log.debug('{net[name]} no messages.'.format(net=net))
return
transdb = DBProxy('{0}trans'.format(net['name']), use_session=False)
transkeys = transdb.keys()
msgs = sorted(msgs, cmp=lambda x, y: cmp(int(x['id']), int(y['id'])))
# store messages locally, saving their translated IDs to the transdb
for msg in msgs:
store_msg = Msg()
store_msg.recipient = msg['recipient']
store_msg.author = msg['author']
store_msg.subject = msg['subject']
store_msg.body = msg['body']
store_msg.tags = set(msg['tags'])
store_msg.tags.add(u''.join((net['name'])))
if msg['recipient'] is None and u'public' not in msg['tags']:
log.warn("{net[name]} No recipient (msg_id={msg[id]}), "
"adding 'public' tag".format(net=net, msg=msg))
store_msg.tags.add(u'public')
if (msg['parent'] is not None and
str(msg['parent']) not in transkeys):
log.warn('{net[name]} No such parent message ({msg[parent]}, '
'msg_id={msg[id]}), removing reference.'
.format(net=net, msg=msg))
elif msg['parent'] is not None:
store_msg.parent = int(transdb[msg['parent']])
if msg['id'] in transkeys:
log.warn('{net[name]} dupe (msg_id={msg[id]}) discarded.'
.format(net=net, msg=msg))
else:
# do not save this message to network, we already received
# it from the network, set send_net=False
store_msg.save(send_net=False, ctime=to_localtime(msg['ctime']))
with transdb:
transdb[msg['id']] = store_msg.idx
transkeys.append(msg['id'])
log.info('{net[name]} Processed (msg_id={msg[id]}) => {new_id}'
.format(net=net, msg=msg, new_id=store_msg.idx))
if 'last' not in net.keys() or int(net['last']) < int(msg['id']):
net['last'] = msg['id']
if 'last' in net.keys():
with open(net['last_file'], 'w') as last_fp:
last_fp.write(str(net['last']))
return
开发者ID:hick,项目名称:x84,代码行数:73,代码来源:msgpoll.py
注:本文中的x84.bbs.DBProxy类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论