本文整理汇总了Python中sickbeard.db.DBConnection类的典型用法代码示例。如果您正苦于以下问题:Python DBConnection类的具体用法?Python DBConnection怎么用?Python DBConnection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DBConnection类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: overall_stats
def overall_stats():
db = DBConnection()
shows = sickbeard.showList
today = str(date.today().toordinal())
downloaded_status = Quality.DOWNLOADED + Quality.ARCHIVED + Quality.WATCHED
snatched_status = Quality.SNATCHED + Quality.SNATCHED_PROPER
total_status = [SKIPPED, WANTED]
results = db.select(
"SELECT airdate, status " "FROM tv_episodes " "WHERE season > 0 " "AND episode > 0 " "AND airdate > 1"
)
stats = {
"episodes": {"downloaded": 0, "snatched": 0, "total": 0},
"shows": {
"active": len([show for show in shows if show.paused == 0 and show.status == "Continuing"]),
"total": len(shows),
},
}
for result in results:
if result["status"] in downloaded_status:
stats["episodes"]["downloaded"] += 1
stats["episodes"]["total"] += 1
elif result["status"] in snatched_status:
stats["episodes"]["snatched"] += 1
stats["episodes"]["total"] += 1
elif result["airdate"] <= today and result["status"] in total_status:
stats["episodes"]["total"] += 1
return stats
开发者ID:Maximilian-Reuter,项目名称:SickRage,代码行数:32,代码来源:Show.py
示例2: find_propers
def find_propers(self, search_date=None):
results = []
db = DBConnection()
placeholder = ','.join([str(x) for x in Quality.DOWNLOADED + Quality.SNATCHED + Quality.SNATCHED_BEST])
sql_results = db.select(
'SELECT s.show_name, e.showid, e.season, e.episode, e.status, e.airdate'
' FROM tv_episodes AS e'
' INNER JOIN tv_shows AS s ON (e.showid = s.indexer_id)'
' WHERE e.airdate >= ' + str(search_date.toordinal()) +
' AND e.status IN (' + placeholder + ')'
)
for result in sql_results or []:
show = Show.find(sickbeard.showList, int(result['showid']))
if show:
episode = show.getEpisode(result['season'], result['episode'])
for term in self.proper_strings:
search_strings = self._get_episode_search_strings(episode, add_string=term)
for item in self.search(search_strings[0]):
title, url = self._get_title_and_url(item)
results.append(Proper(title, url, datetime.today(), show))
return results
开发者ID:Hydrog3n,项目名称:SickRage,代码行数:27,代码来源:TorrentProvider.py
示例3: overall_stats
def overall_stats():
db = DBConnection()
shows = sickbeard.showList
today = str(date.today().toordinal())
downloaded_status = Quality.DOWNLOADED + Quality.ARCHIVED
snatched_status = Quality.SNATCHED + Quality.SNATCHED_PROPER + Quality.SNATCHED_BEST
total_status = [SKIPPED, WANTED]
results = db.select(
'SELECT airdate, status '
'FROM tv_episodes '
'WHERE season > 0 '
'AND episode > 0 '
'AND airdate > 1'
)
stats = {
'episodes': {
'downloaded': 0,
'snatched': 0,
'total': 0,
},
'shows': {
'active': len([show for show in shows if show.paused == 0 and show.status == 'Continuing']),
'total': len(shows),
},
}
for result in results:
if result[b'status'] in downloaded_status:
stats['episodes']['downloaded'] += 1
stats['episodes']['total'] += 1
elif result[b'status'] in snatched_status:
stats['episodes']['snatched'] += 1
stats['episodes']['total'] += 1
elif result[b'airdate'] <= today and result[b'status'] in total_status:
stats['episodes']['total'] += 1
return stats
开发者ID:KraXed112,项目名称:SickRage,代码行数:40,代码来源:Show.py
示例4: __init__
def __init__(self):
self.db = DBConnection()
开发者ID:hernandito,项目名称:SickRage,代码行数:2,代码来源:History.py
示例5: find_search_results
#.........这里部分代码省略.........
u'This season result {0} is for a season we are not searching for, skipping it'.format(title),
logger.DEBUG
)
add_cache_entry = True
else:
if not all([
# pylint: disable=bad-continuation
parse_result.season_number is not None,
parse_result.episode_numbers,
[ep for ep in episodes if (ep.season, ep.scene_season)[ep.show.is_scene] ==
(parse_result.season_number, parse_result.scene_season)[ep.show.is_scene] and
(ep.episode, ep.scene_episode)[ep.show.is_scene] in parse_result.episode_numbers]
]):
logger.log(
u'The result {0} doesn\'t seem to match an episode that we are currently trying to snatch, skipping it'.format(title),
logger.DEBUG)
add_cache_entry = True
if not add_cache_entry:
actual_season = parse_result.season_number
actual_episodes = parse_result.episode_numbers
else:
same_day_special = False
if not parse_result.is_air_by_date:
logger.log(
u'This is supposed to be a date search but the result {0} didn\'t parse as one, skipping it'.format(title),
logger.DEBUG)
add_cache_entry = True
else:
air_date = parse_result.air_date.toordinal()
db = DBConnection()
sql_results = db.select(
'SELECT season, episode FROM tv_episodes WHERE showid = ? AND airdate = ?',
[show_object.indexerid, air_date]
)
if len(sql_results) == 2:
if int(sql_results[0]['season']) == 0 and int(sql_results[1]['season']) != 0:
actual_season = int(sql_results[1]['season'])
actual_episodes = [int(sql_results[1]['episode'])]
same_day_special = True
elif int(sql_results[1]['season']) == 0 and int(sql_results[0]['season']) != 0:
actual_season = int(sql_results[0]['season'])
actual_episodes = [int(sql_results[0]['episode'])]
same_day_special = True
elif len(sql_results) != 1:
logger.log(
u'Tried to look up the date for the episode {0} but the database didn\'t give proper results, skipping it'.format(title),
logger.WARNING)
add_cache_entry = True
if not add_cache_entry and not same_day_special:
actual_season = int(sql_results[0]['season'])
actual_episodes = [int(sql_results[0]['episode'])]
if add_cache_entry:
logger.log(u'Adding item from search to cache: {0}'.format(title), logger.DEBUG)
# pylint: disable=protected-access
# Access to a protected member of a client class
ci = self.cache._addCacheEntry(title, url, parse_result=parse_result)
if ci is not None:
cl.append(ci)
开发者ID:NickMolloy,项目名称:SickRage,代码行数:67,代码来源:GenericProvider.py
示例6: get_coming_episodes
def get_coming_episodes(categories, sort, group, paused=sickbeard.COMING_EPS_DISPLAY_PAUSED):
"""
:param categories: The categories of coming episodes. See ``ComingEpisodes.categories``
:param sort: The sort to apply to the coming episodes. See ``ComingEpisodes.sorts``
:param group: ``True`` to group the coming episodes by category, ``False`` otherwise
:param paused: ``True`` to include paused shows, ``False`` otherwise
:return: The list of coming episodes
"""
categories = ComingEpisodes._get_categories(categories)
sort = ComingEpisodes._get_sort(sort)
today = date.today().toordinal()
next_week = (date.today() + timedelta(days=7)).toordinal()
recently = (date.today() - timedelta(days=sickbeard.COMING_EPS_MISSED_RANGE)).toordinal()
qualities_list = Quality.DOWNLOADED + Quality.SNATCHED + Quality.SNATCHED_BEST + Quality.SNATCHED_PROPER + Quality.ARCHIVED + [IGNORED]
db = DBConnection()
fields_to_select = ', '.join(
['airdate', 'airs', 'description', 'episode', 'imdb_id', 'e.indexer', 'indexer_id', 'name', 'network',
'paused', 'quality', 'runtime', 'season', 'show_name', 'showid', 's.status']
)
results = db.select(
'SELECT %s ' % fields_to_select +
'FROM tv_episodes e, tv_shows s '
'WHERE season != 0 '
'AND airdate >= ? '
'AND airdate < ? '
'AND s.indexer_id = e.showid '
'AND e.status NOT IN (' + ','.join(['?'] * len(qualities_list)) + ')',
[today, next_week] + qualities_list
)
done_shows_list = [int(result['showid']) for result in results]
placeholder = ','.join(['?'] * len(done_shows_list))
placeholder2 = ','.join(['?'] * len(Quality.DOWNLOADED + Quality.SNATCHED + Quality.SNATCHED_BEST + Quality.SNATCHED_PROPER))
results += db.select(
'SELECT %s ' % fields_to_select +
'FROM tv_episodes e, tv_shows s '
'WHERE season != 0 '
'AND showid NOT IN (' + placeholder + ') '
'AND s.indexer_id = e.showid '
'AND airdate = (SELECT airdate '
'FROM tv_episodes inner_e '
'WHERE inner_e.season != 0 '
'AND inner_e.showid = e.showid '
'AND inner_e.airdate >= ? '
'ORDER BY inner_e.airdate ASC LIMIT 1) '
'AND e.status NOT IN (' + placeholder2 + ')',
done_shows_list + [next_week] + Quality.DOWNLOADED + Quality.SNATCHED + Quality.SNATCHED_BEST + Quality.SNATCHED_PROPER
)
results += db.select(
'SELECT %s ' % fields_to_select +
'FROM tv_episodes e, tv_shows s '
'WHERE season != 0 '
'AND s.indexer_id = e.showid '
'AND airdate < ? '
'AND airdate >= ? '
'AND e.status IN (?,?) '
'AND e.status NOT IN (' + ','.join(['?'] * len(qualities_list)) + ')',
[today, recently, WANTED, UNAIRED] + qualities_list
)
results = [dict(result) for result in results]
for index, item in enumerate(results):
results[index]['localtime'] = sbdatetime.convert_to_setting(
parse_date_time(item['airdate'], item['airs'], item['network']))
results.sort(ComingEpisodes.sorts[sort])
if not group:
return results
grouped_results = ComingEpisodes._get_categories_map(categories)
for result in results:
if result['paused'] and not paused:
continue
result['airs'] = str(result['airs']).replace('am', ' AM').replace('pm', ' PM').replace(' ', ' ')
result['airdate'] = result['localtime'].toordinal()
if result['airdate'] < today:
category = 'missed'
elif result['airdate'] >= next_week:
category = 'later'
elif result['airdate'] == today:
category = 'today'
else:
category = 'soon'
if len(categories) > 0 and category not in categories:
continue
if not result['network']:
result['network'] = ''
#.........这里部分代码省略.........
开发者ID:lastdevonearth,项目名称:SickRage,代码行数:101,代码来源:ComingEpisodes.py
示例7: get_coming_episodes
def get_coming_episodes(categories, sort, group, paused=sickbeard.COMING_EPS_DISPLAY_PAUSED):
"""
:param categories: The categories of coming episodes. See ``ComingEpisodes.categories``
:param sort: The sort to apply to the coming episodes. See ``ComingEpisodes.sorts``
:param group: ``True`` to group the coming episodes by category, ``False`` otherwise
:param paused: ``True`` to include paused shows, ``False`` otherwise
:return: The list of coming episodes
"""
if not isinstance(categories, list):
categories = categories.split("|")
if sort not in ComingEpisodes.sorts.keys():
sort = "date"
today = date.today().toordinal()
next_week = (date.today() + timedelta(days=7)).toordinal()
recently = (date.today() - timedelta(days=sickbeard.COMING_EPS_MISSED_RANGE)).toordinal()
qualities_list = (
Quality.DOWNLOADED
+ Quality.SNATCHED
+ Quality.SNATCHED_BEST
+ Quality.SNATCHED_PROPER
+ Quality.ARCHIVED
+ [IGNORED]
)
db = DBConnection()
fields_to_select = ", ".join(
[
"airdate",
"airs",
"description",
"episode",
"imdb_id",
"e.indexer",
"indexer_id",
"name",
"network",
"paused",
"quality",
"runtime",
"season",
"show_name",
"showid",
"s.status",
]
)
results = db.select(
"SELECT %s " % fields_to_select + "FROM tv_episodes e, tv_shows s "
"WHERE season != 0 "
"AND airdate >= ? "
"AND airdate < ? "
"AND s.indexer_id = e.showid "
"AND e.status NOT IN (" + ",".join(["?"] * len(qualities_list)) + ")",
[today, next_week] + qualities_list,
)
done_shows_list = [int(result[b"showid"]) for result in results]
placeholder = ",".join(["?"] * len(done_shows_list))
placeholder2 = ",".join(
["?"] * len(Quality.DOWNLOADED + Quality.SNATCHED + Quality.SNATCHED_BEST + Quality.SNATCHED_PROPER)
)
results += db.select(
"SELECT %s " % fields_to_select + "FROM tv_episodes e, tv_shows s "
"WHERE season != 0 "
"AND showid NOT IN (" + placeholder + ") "
"AND s.indexer_id = e.showid "
"AND airdate = (SELECT airdate "
"FROM tv_episodes inner_e "
"WHERE inner_e.season != 0 "
"AND inner_e.showid = e.showid "
"AND inner_e.airdate >= ? "
"ORDER BY inner_e.airdate ASC LIMIT 1) "
"AND e.status NOT IN (" + placeholder2 + ")",
done_shows_list
+ [next_week]
+ Quality.DOWNLOADED
+ Quality.SNATCHED
+ Quality.SNATCHED_BEST
+ Quality.SNATCHED_PROPER,
)
results += db.select(
"SELECT %s " % fields_to_select + "FROM tv_episodes e, tv_shows s "
"WHERE season != 0 "
"AND s.indexer_id = e.showid "
"AND airdate < ? "
"AND airdate >= ? "
"AND e.status IN (?,?) "
"AND e.status NOT IN (" + ",".join(["?"] * len(qualities_list)) + ")",
[today, recently, WANTED, UNAIRED] + qualities_list,
)
results = [dict(result) for result in results]
for index, item in enumerate(results):
results[index][b"localtime"] = sbdatetime.convert_to_setting(
parse_date_time(item[b"airdate"], item[b"airs"], item[b"network"])
#.........这里部分代码省略.........
开发者ID:coderbone,项目名称:SickRage,代码行数:101,代码来源:ComingEpisodes.py
示例8: get_coming_episodes
def get_coming_episodes(categories, sort, group, paused=sickbeard.COMING_EPS_DISPLAY_PAUSED):
"""
:param categories: The categories of coming episodes. See ``ComingEpisodes.categories``
:param sort: The sort to apply to the coming episodes. See ``ComingEpisodes.sorts``
:param group: ``True`` to group the coming episodes by category, ``False`` otherwise
:param paused: ``True`` to include paused shows, ``False`` otherwise
:return: The list of coming episodes
"""
categories = ComingEpisodes._get_categories(categories)
sort = ComingEpisodes._get_sort(sort)
today = date.today().toordinal()
recently = (date.today() - timedelta(days=sickbeard.COMING_EPS_MISSED_RANGE)).toordinal()
next_week = (date.today() + timedelta(days=7)).toordinal()
db = DBConnection(row_type='dict')
fields_to_select = ', '.join(
['airdate', 'airs', 'e.description as description', 'episode', 'imdb_id', 'e.indexer', 'indexer_id', 'name', 'network',
'paused', 'quality', 'runtime', 'season', 'show_name', 'showid', 's.status']
)
sql_l = []
for show_obj in sickbeard.showList:
next_air_date = show_obj.nextEpisode()
if next_air_date:
sql_l.append(
[
'SELECT DISTINCT {0} '.format(fields_to_select) +
'FROM tv_episodes e, tv_shows s '
'WHERE showid = ? '
'AND airdate <= ? '
'AND airdate >= ? '
'AND s.indexer_id = e.showid '
'AND e.status IN (' + ','.join(['?'] * 2) + ')',
[show_obj.indexerid, next_air_date, recently, WANTED, UNAIRED]
]
)
results = []
for sql_i in sql_l:
if results:
results += db.select(*sql_i)
else:
results = db.select(*sql_i)
for index, item in enumerate(results):
results[index][b'localtime'] = sbdatetime.convert_to_setting(
parse_date_time(item[b'airdate'], item[b'airs'], item[b'network']))
results.sort(key=itemgetter(b'localtime'))
results.sort(ComingEpisodes.sorts[sort])
if not group:
return results
grouped_results = ComingEpisodes._get_categories_map(categories)
for result in results:
if result[b'paused'] and not paused:
continue
result[b'airs'] = str(result[b'airs']).replace('am', ' AM').replace('pm', ' PM').replace(' ', ' ')
result[b'airdate'] = result[b'localtime'].toordinal()
if result[b'airdate'] < today:
category = 'missed'
elif result[b'airdate'] >= next_week:
category = 'later'
elif result[b'airdate'] == today:
category = 'today'
else:
category = 'soon'
if len(categories) > 0 and category not in categories:
continue
if not result[b'network']:
result[b'network'] = ''
result[b'quality'] = get_quality_string(result[b'quality'])
result[b'airs'] = sbdatetime.sbftime(result[b'localtime'], t_preset=timeFormat).lstrip('0').replace(' 0', ' ')
result[b'weekday'] = 1 + result[b'localtime'].weekday()
result[b'tvdbid'] = result[b'indexer_id']
result[b'airdate'] = sbdatetime.sbfdate(result[b'localtime'], d_preset=dateFormat)
result[b'localtime'] = result[b'localtime'].toordinal()
grouped_results[category].append(result)
return grouped_results
开发者ID:KraXed112,项目名称:SickRage,代码行数:90,代码来源:ComingEpisodes.py
示例9: find_search_results
#.........这里部分代码省略.........
logger.DEBUG
)
add_cache_entry = True
else:
if not all([
# pylint: disable=bad-continuation
parse_result.season_number is not None,
parse_result.episode_numbers,
[ep for ep in episodes if (ep.season, ep.scene_season)[ep.show.is_scene] ==
parse_result.season_number and
(ep.episode, ep.scene_episode)[ep.show.is_scene] in
parse_result.episode_numbers]
]):
logger.log(
'The result %s doesn\'t seem to match an episode that we are currently trying to snatch, skipping it' % title,
logger.DEBUG)
add_cache_entry = True
if not add_cache_entry:
actual_season = parse_result.season_number
actual_episodes = parse_result.episode_numbers
else:
same_day_special = False
if not parse_result.is_air_by_date:
logger.log(
'This is supposed to be a date search but the result %s didn\'t parse as one, skipping it' % title,
logger.DEBUG)
add_cache_entry = True
else:
air_date = parse_result.air_date.toordinal()
db = DBConnection()
sql_results = db.select(
'SELECT season, episode FROM tv_episodes WHERE showid = ? AND airdate = ?',
[show_object.indexerid, air_date]
)
if len(sql_results) == 2:
if int(sql_results[0][b'season']) == 0 and int(sql_results[1][b'season']) != 0:
actual_season = int(sql_results[1][b'season'])
actual_episodes = [int(sql_results[1][b'episode'])]
same_day_special = True
elif int(sql_results[1][b'season']) == 0 and int(sql_results[0][b'season']) != 0:
actual_season = int(sql_results[0][b'season'])
actual_episodes = [int(sql_results[0][b'episode'])]
same_day_special = True
elif len(sql_results) != 1:
logger.log(
'Tried to look up the date for the episode %s but the database didn\'t give proper results, skipping it' % title,
logger.WARNING)
add_cache_entry = True
if not add_cache_entry and not same_day_special:
actual_season = int(sql_results[0][b'season'])
actual_episodes = [int(sql_results[0][b'episode'])]
else:
actual_season = parse_result.season_number
actual_episodes = parse_result.episode_numbers
if add_cache_entry:
logger.log('Adding item from search to cache: %s' % title, logger.DEBUG)
# pylint: disable=protected-access
# Access to a protected member of a client class
ci = self.cache._addCacheEntry(title, url, seeders, leechers, size, pubdate, torrent_hash)
开发者ID:Eiber,项目名称:SickRage-Medusa,代码行数:67,代码来源:GenericProvider.py
示例10: run
def run(self, force=False): # pylint:disable=too-many-branches
"""
Runs the daily searcher, queuing selected episodes for search
:param force: Force search
"""
if self.amActive:
logger.log('Daily search is still running, not starting it again', logger.DEBUG)
return
elif sickbeard.forcedSearchQueueScheduler.action.is_forced_search_in_progress() and not force:
logger.log('Manual search is running. Can\'t start Daily search', logger.WARNING)
return
self.amActive = True
logger.log('Searching for newly released episodes ...')
if not network_dict:
update_network_dict()
cur_time = datetime.now(sb_timezone)
cur_date = (
date.today() + timedelta(days=1 if network_dict else 2)
).toordinal()
main_db_con = DBConnection()
episodes_from_db = main_db_con.select(
b'SELECT showid, airdate, season, episode '
b'FROM tv_episodes '
b'WHERE status = ? AND (airdate <= ? and airdate > 1)',
[common.UNAIRED, cur_date]
)
new_releases = []
show = None
for db_episode in episodes_from_db:
try:
show_id = int(db_episode[b'showid'])
if not show or show_id != show.indexerid:
show = Show.find(sickbeard.showList, show_id)
# for when there is orphaned series in the database but not loaded into our show list
if not show or show.paused:
continue
except MultipleShowObjectsException:
logger.log('ERROR: expected to find a single show matching {id}'.format(id=show_id))
continue
if show.airs and show.network:
# This is how you assure it is always converted to local time
show_air_time = parse_date_time(db_episode[b'airdate'], show.airs, show.network)
end_time = show_air_time.astimezone(sb_timezone) + timedelta(minutes=try_int(show.runtime, 60))
# filter out any episodes that haven't finished airing yet,
if end_time > cur_time:
continue
cur_ep = show.get_episode(db_episode[b'season'], db_episode[b'episode'])
with cur_ep.lock:
cur_ep.status = show.default_ep_status if cur_ep.season else common.SKIPPED
logger.log('Setting status ({status}) for show airing today: {name} {special}'.format(
name=cur_ep.pretty_name(),
status=common.statusStrings[cur_ep.status],
special='(specials are not supported)' if not cur_ep.season else ''
))
new_releases.append(cur_ep.get_sql())
if new_releases:
main_db_con = DBConnection()
main_db_con.mass_action(new_releases)
else:
logger.log('No newly released episodes found ...')
# queue episode for daily search
sickbeard.searchQueueScheduler.action.add_item(
DailySearchQueueItem()
)
self.amActive = False
开发者ID:Thraxis,项目名称:pymedusa,代码行数:81,代码来源:dailysearcher.py
示例11: __init__
class History:
date_format = '%Y%m%d%H%M%S'
def __init__(self):
self.db = DBConnection()
def clear(self):
self.db.action(
'DELETE '
'FROM history '
'WHERE 1 = 1'
)
def get(self, limit=100, action=None):
action = action.lower() if isinstance(action, str) else ''
limit = int(limit)
if action == 'downloaded':
actions = Quality.DOWNLOADED
elif action == 'snatched':
actions = Quality.SNATCHED
else:
actions = Quality.SNATCHED + Quality.DOWNLOADED
if limit == 0:
results = self.db.select(
'SELECT h.*, show_name '
'FROM history h, tv_shows s '
'WHERE h.showid = s.indexer_id '
'AND action in (' + ','.join(['?'] * len(actions)) + ') '
'ORDER BY date DESC',
actions
)
else:
results = self.db.select(
'SELECT h.*, show_name '
'FROM history h, tv_shows s '
'WHERE h.showid = s.indexer_id '
'AND action in (' + ','.join(['?'] * len(actions)) + ') '
'ORDER BY date DESC '
'LIMIT ?',
actions + [limit]
)
data = []
for result in results:
data.append({
'action': result['action'],
'date': result['date'],
'episode': result['episode'],
'provider': result['provider'],
'quality': result['quality'],
'resource': result['resource'],
'season': result['season'],
'show_id': result['showid'],
'show_name': result['show_name']
})
return data
def trim(self):
self.db.action(
'DELETE '
'FROM history '
'WHERE date < ?',
[(datetime.today() - timedelta(days=30)).strftime(History.date_format)]
)
开发者ID:OneDegreeOffCenter,项目名称:SickRage,代码行数:67,代码来源:History.py
示例12: History
class History(object):
date_format = '%Y%m%d%H%M%S'
def __init__(self):
self.db = DBConnection()
def clear(self):
"""
Clear all the history
"""
self.db.action(
'DELETE '
'FROM history '
'WHERE 1 = 1'
)
def get(self, limit=100, action=None):
"""
:param limit: The maximum number of elements to return
:param action: The type of action to filter in the history. Either 'downloaded' or 'snatched'. Anything else or
no value will return everything (up to ``limit``)
:return: The last ``limit`` elements of type ``action`` in the history
"""
# TODO: Make this a generator instead
# TODO: Split compact and detailed into separate methods
# TODO: Add a date limit as well
# TODO: Clean up history.mako
actions = History._get_actions(action)
limit = max(try_int(limit), 0)
common_sql = 'SELECT show_name, showid, season, episode, h.quality, ' \
'action, provider, resource, date ' \
'FROM history h, tv_shows s ' \
'WHERE h.showid = s.indexer_id '
filter_sql = 'AND action in (' + ','.join(['?'] * len(actions)) + ') '
order_sql = 'ORDER BY date DESC '
if actions:
sql_results = self.db.select(common_sql + filter_sql + order_sql,
actions)
else:
sql_results = self.db.select(common_sql + order_sql)
detailed = []
compact = dict()
# TODO: Convert to a defaultdict and compact items as needed
# TODO: Convert to using operators to combine items
for row in sql_results:
row = History.Item(*row)
if row.index in compact:
compact[row.index].actions.append(row.cur_action)
elif not limit or len(compact) < limit:
detailed.append(row)
compact[row.index] = row.compacted()
results = namedtuple('results', ['detailed', 'compact'])
return results(detailed, compact.values())
def trim(self, days=30):
"""
Remove expired elements from history
:param days: number of days to keep
"""
date = datetime.today() - timedelta(days)
self.db.action(
'DELETE '
'FROM history '
'WHERE date < ?',
[date.strftime(History.date_format)]
)
@staticmethod
def _get_actions(action):
action = action.lower() if isinstance(action, (str, unicode)) else ''
result = None
if action == 'downloaded':
result = Quality.DOWNLOADED
elif action == 'snatched':
result = Quality.SNATCHED
return result or []
action_fields = ('action', 'provider', 'resource', 'date', )
# A specific action from history
Action = namedtuple('Action', action_fields)
Action.width = len(action_fields)
index_fields = ('show_id', 'season', 'episode', 'quality', )
# An index for an item or compact item from history
Index = namedtuple('Index', index_fields)
Index.width = len(index_fields)
compact_fields = ('show_name', 'index', 'actions', )
# Related items compacted with a list of actions from history
CompactItem = namedtuple('CompactItem', compact_fields)
#.........这里部分代码省略.........
开发者ID:bitzorro,项目名称:SickRage,代码行数:101,代码来源:History.py
注:本文中的sickbeard.db.DBConnection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论