本文整理汇总了Python中zeroinstall.logger.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了debug函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: update_feed_from_network
def update_feed_from_network(self, feed_url, new_xml, modified_time, dry_run = False):
"""Update a cached feed.
Called by L{update_feed_if_trusted} if we trust this data.
After a successful update, L{writer} is used to update the feed's
last_checked time.
@param feed_url: the feed being updated
@type feed_url: L{model.Interface}
@param new_xml: the downloaded replacement feed document
@type new_xml: str
@param modified_time: the timestamp of the oldest trusted signature (used as an approximation to the feed's modification time)
@type modified_time: long
@type dry_run: bool
@raises ReplayAttack: if modified_time is older than the currently cached time
@since: 0.48"""
logger.debug(_("Updating '%(interface)s' from network; modified at %(time)s") %
{'interface': feed_url, 'time': _pretty_time(modified_time)})
self._import_new_feed(feed_url, new_xml, modified_time, dry_run)
if dry_run: return
feed = self.get_feed(feed_url)
from . import writer
feed.last_checked = int(time.time())
writer.save_feed(feed)
logger.info(_("Updated feed cache entry for %(interface)s (modified %(time)s)"),
{'interface': feed.get_name(), 'time': _pretty_time(modified_time)})
开发者ID:res2k,项目名称:0install,代码行数:29,代码来源:iface_cache.py
示例2: __init__
def __init__(self):
# Always add the user cache to have a reliable fallback location for storage
user_store = os.path.join(basedir.xdg_cache_home, '0install.net', 'implementations')
self.stores = [Store(user_store)]
# Add custom cache locations
dirs = []
for impl_dirs in basedir.load_config_paths('0install.net', 'injector', 'implementation-dirs'):
with open(impl_dirs, 'rt') as stream:
dirs.extend(stream.readlines())
for directory in dirs:
directory = directory.strip()
if directory and not directory.startswith('#'):
logger.debug(_("Added system store '%s'"), directory)
self.stores.append(Store(directory))
# Add the system cache when not in portable mode
if not os.environ.get('ZEROINSTALL_PORTABLE_BASE'):
if os.name == "nt":
from win32com.shell import shell, shellcon
commonAppData = shell.SHGetFolderPath(0, shellcon.CSIDL_COMMON_APPDATA, 0, 0)
systemCachePath = os.path.join(commonAppData, "0install.net", "implementations")
# Only use shared cache location on Windows if it was explicitly created
if os.path.isdir(systemCachePath):
self.stores.append(Store(systemCachePath))
else:
self.stores.append(Store('/var/cache/0install.net/implementations'))
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:27,代码来源:__init__.py
示例3: _finish
def _finish(self, status):
"""@type status: int"""
assert self.status is download_fetching
assert self.tempfile is not None
assert not self.aborted_by_user
if status == RESULT_NOT_MODIFIED:
logger.debug("%s not modified", self.url)
self.tempfile = None
self.unmodified = True
self.status = download_complete
self._final_total_size = 0
return
self._final_total_size = self.get_bytes_downloaded_so_far()
self.tempfile = None
try:
assert status == RESULT_OK
# Check that the download has the correct size, if we know what it should be.
if self.expected_size is not None:
if self._final_total_size != self.expected_size:
raise SafeException(_('Downloaded archive has incorrect size.\n'
'URL: %(url)s\n'
'Expected: %(expected_size)d bytes\n'
'Received: %(size)d bytes') % {'url': self.url, 'expected_size': self.expected_size, 'size': self._final_total_size})
except:
self.status = download_failed
raise
else:
self.status = download_complete
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:33,代码来源:download.py
示例4: __init__
def __init__(self):
user_store = os.path.join(basedir.xdg_cache_home, '0install.net', 'implementations')
self.stores = [Store(user_store)]
impl_dirs = basedir.load_first_config('0install.net', 'injector',
'implementation-dirs')
logger.debug(_("Location of 'implementation-dirs' config file being used: '%s'"), impl_dirs)
if impl_dirs:
with open(impl_dirs, 'rt') as stream:
dirs = stream.readlines()
else:
if os.name == "nt":
from win32com.shell import shell, shellcon
localAppData = shell.SHGetFolderPath(0, shellcon.CSIDL_LOCAL_APPDATA, 0, 0)
commonAppData = shell.SHGetFolderPath(0, shellcon.CSIDL_COMMON_APPDATA, 0, 0)
userCache = os.path.join(localAppData, "0install.net", "implementations")
sharedCache = os.path.join(commonAppData, "0install.net", "implementations")
dirs = [userCache, sharedCache]
else:
dirs = ['/var/cache/0install.net/implementations']
for directory in dirs:
directory = directory.strip()
if directory and not directory.startswith('#'):
logger.debug(_("Added system store '%s'"), directory)
self.stores.append(Store(directory))
开发者ID:dabrahams,项目名称:0install,代码行数:28,代码来源:__init__.py
示例5: __init__
def __init__(self, config, requirements):
"""
@param config: The configuration settings to use
@type config: L{config.Config}
@param requirements: Details about the program we want to run
@type requirements: L{requirements.Requirements}
@since: 0.53
"""
self.watchers = []
assert config
self.config = config
assert requirements
self.requirements = requirements
self.target_arch = arch.get_architecture(requirements.os, requirements.cpu)
from zeroinstall.injector.solver import DefaultSolver
self.solver = DefaultSolver(self.config)
logger.debug(_("Supported systems: '%s'"), arch.os_ranks)
logger.debug(_("Supported processors: '%s'"), arch.machine_ranks)
if requirements.before or requirements.not_before:
self.solver.extra_restrictions[config.iface_cache.get_interface(requirements.interface_uri)] = [
model.VersionRangeRestriction(
model.parse_version(requirements.before), model.parse_version(requirements.not_before)
)
]
开发者ID:timdiels,项目名称:0install,代码行数:31,代码来源:driver.py
示例6: is_stale
def is_stale(self, feed_url, freshness_threshold):
"""Check whether feed needs updating, based on the configured L{config.Config.freshness}.
None is considered to be stale.
If we already tried to update the feed within FAILED_CHECK_DELAY, returns false.
@type feed_url: str
@type freshness_threshold: int
@return: True if feed should be updated
@rtype: bool
@since: 0.53"""
if isinstance(feed_url, model.ZeroInstallFeed):
feed_url = feed_url.url # old API
elif feed_url is None:
return True # old API
now = time.time()
feed = self.get_feed(feed_url)
if feed is not None:
if feed.local_path is not None:
return False # Local feeds are never stale
if feed.last_modified is not None:
staleness = now - (feed.last_checked or 0)
logger.debug(_("Staleness for %(feed)s is %(staleness).2f hours"), {'feed': feed, 'staleness': staleness / 3600.0})
if freshness_threshold <= 0 or staleness < freshness_threshold:
return False # Fresh enough for us
# else we've never had it
last_check_attempt = self.get_last_check_attempt(feed_url)
if last_check_attempt and last_check_attempt > now - FAILED_CHECK_DELAY:
logger.debug(_("Stale, but tried to check recently (%s) so not rechecking now."), time.ctime(last_check_attempt))
return False
return True
开发者ID:res2k,项目名称:0install,代码行数:35,代码来源:iface_cache.py
示例7: download_icon
def download_icon(self, interface, force=False):
"""Download an icon for this interface and add it to the
icon cache. If the interface has no icon do nothing.
@return: the task doing the import, or None
@rtype: L{tasks.Task}"""
logger.debug("download_icon %(interface)s", {"interface": interface})
modification_time = None
existing_icon = self.config.iface_cache.get_icon_path(interface)
if existing_icon:
file_mtime = os.stat(existing_icon).st_mtime
from email.utils import formatdate
modification_time = formatdate(timeval=file_mtime, localtime=False, usegmt=True)
feed = self.config.iface_cache.get_feed(interface.uri)
if feed is None:
return None
# Find a suitable icon to download
for icon in feed.get_metadata(XMLNS_IFACE, "icon"):
type = icon.getAttribute("type")
if type != "image/png":
logger.debug(_("Skipping non-PNG icon"))
continue
source = icon.getAttribute("href")
if source:
break
logger.warn(_('Missing "href" attribute on <icon> in %s'), interface)
else:
logger.info(_("No PNG icons found in %s"), interface)
return
dl = self.download_url(source, hint=interface, modification_time=modification_time)
@tasks.async
def download_and_add_icon():
stream = dl.tempfile
try:
yield dl.downloaded
tasks.check(dl.downloaded)
if dl.unmodified:
return
stream.seek(0)
import shutil, tempfile
icons_cache = basedir.save_cache_path(config_site, "interface_icons")
tmp_file = tempfile.NamedTemporaryFile(dir=icons_cache, delete=False)
shutil.copyfileobj(stream, tmp_file)
tmp_file.close()
icon_file = os.path.join(icons_cache, escape(interface.uri))
portable_rename(tmp_file.name, icon_file)
finally:
stream.close()
return download_and_add_icon()
开发者ID:michel-slm,项目名称:0install,代码行数:59,代码来源:fetch.py
示例8: download_impls
def download_impls(self, implementations, stores):
"""Download the given implementations, choosing a suitable retrieval method for each.
If any of the retrieval methods are DistributionSources and
need confirmation, handler.confirm is called to check that the
installation should proceed.
@type implementations: [L{zeroinstall.injector.model.ZeroInstallImplementation}]
@type stores: L{zeroinstall.zerostore.Stores}
@rtype: L{zeroinstall.support.tasks.Blocker}"""
unsafe_impls = []
to_download = []
for impl in implementations:
logger.debug(_("start_downloading_impls: for %(feed)s get %(implementation)s"), {'feed': impl.feed, 'implementation': impl})
source = self.get_best_source(impl)
if not source:
raise SafeException(_("Implementation %(implementation_id)s of interface %(interface)s"
" cannot be downloaded (no download locations given in "
"interface!)") % {'implementation_id': impl.id, 'interface': impl.feed.get_name()})
to_download.append((impl, source))
if isinstance(source, DistributionSource) and source.needs_confirmation:
unsafe_impls.append(source.package_id)
@tasks.async
def download_impls():
if unsafe_impls:
confirm = self.handler.confirm_install(_('The following components need to be installed using native packages. '
'These come from your distribution, and should therefore be trustworthy, but they also '
'run with extra privileges. In particular, installing them may run extra services on your '
'computer or affect other users. You may be asked to enter a password to confirm. The '
'packages are:\n\n') + ('\n'.join('- ' + x for x in unsafe_impls)))
yield confirm
tasks.check(confirm)
blockers = []
for impl, source in to_download:
blockers.append(self.download_impl(impl, source, stores))
# Record the first error log the rest
error = []
def dl_error(ex, tb = None):
if error:
self.handler.report_error(ex)
else:
error.append((ex, tb))
while blockers:
yield blockers
tasks.check(blockers, dl_error)
blockers = [b for b in blockers if not b.happened]
if error:
from zeroinstall import support
support.raise_with_traceback(*error[0])
if not to_download:
return None
return download_impls()
开发者ID:res2k,项目名称:0install,代码行数:59,代码来源:fetch.py
示例9: recv_json
def recv_json():
logger.debug("Waiting for length...")
l = stdin.readline().strip()
logger.debug("Read '%s' from master", l)
if not l:
sys.stdout = sys.stderr
return None
return json.loads(stdin.read(int(l)).decode('utf-8'))
开发者ID:rammstein,项目名称:0install,代码行数:8,代码来源:slave.py
示例10: recv_json
def recv_json():
logger.debug("Waiting for length...")
data = read_chunk()
if not data:
sys.stdout = sys.stderr
return None
data = data.decode('utf-8')
logger.debug("Read '%s' from master", data)
return json.loads(data)
开发者ID:afb,项目名称:0install,代码行数:9,代码来源:slave.py
示例11: handle_invoke
def handle_invoke(config, options, ticket, request):
try:
command = request[0]
logger.debug("Got request '%s'", command)
if command == 'open-gui':
response = do_open_gui(request[1:])
elif command == 'run-gui':
do_run_gui(ticket)
return #async
elif command == 'wait-for-network':
response = do_wait_for_network(config)
elif command == 'check-gui':
response = do_check_gui(request[1])
elif command == 'report-error':
response = do_report_error(config, request[1])
elif command == 'gui-update-selections':
xml = qdom.parse(BytesIO(read_chunk()))
response = do_gui_update_selections(request[1:], xml)
elif command == 'download-selections':
xml = qdom.parse(BytesIO(read_chunk()))
blocker = do_download_selections(config, ticket, options, request[1:], xml)
return #async
elif command == 'import-feed':
xml = qdom.parse(BytesIO(read_chunk()))
response = do_import_feed(config, xml)
elif command == 'get-package-impls':
xml = qdom.parse(BytesIO(read_chunk()))
response = do_get_package_impls(config, options, request[1:], xml)
elif command == 'is-distro-package-installed':
xml = qdom.parse(BytesIO(read_chunk()))
response = do_is_distro_package_installed(config, options, xml)
elif command == 'get-distro-candidates':
xml = qdom.parse(BytesIO(read_chunk()))
blocker = do_get_distro_candidates(config, request[1:], xml)
reply_when_done(ticket, blocker)
return # async
elif command == 'confirm-keys':
do_confirm_keys(config, ticket, request[1:])
return # async
elif command == 'download-url':
do_download_url(config, ticket, request[1:])
return
elif command == 'notify-user':
response = do_notify_user(config, request[1])
else:
raise SafeException("Internal error: unknown command '%s'" % command)
response = ['ok', response]
except SafeException as ex:
logger.info("Replying with error: %s", ex)
response = ['error', str(ex)]
except Exception as ex:
import traceback
logger.info("Replying with error: %s", ex)
response = ['error', traceback.format_exc().strip()]
send_json(["return", ticket, response])
开发者ID:linuxmidhun,项目名称:0install,代码行数:56,代码来源:slave.py
示例12: _import_new_feed
def _import_new_feed(self, feed_url, new_xml, modified_time):
"""Write new_xml into the cache.
@param feed_url: the URL for the feed being updated
@param new_xml: the data to write
@param modified_time: when new_xml was modified
@raises ReplayAttack: if the new mtime is older than the current one
"""
assert modified_time
assert isinstance(new_xml, bytes), repr(new_xml)
upstream_dir = basedir.save_cache_path(config_site, 'interfaces')
cached = os.path.join(upstream_dir, escape(feed_url))
old_modified = None
if os.path.exists(cached):
with open(cached, 'rb') as stream:
old_xml = stream.read()
if old_xml == new_xml:
logger.debug(_("No change"))
# Update in-memory copy, in case someone else updated the disk copy
self.get_feed(feed_url, force = True)
return
old_modified = int(os.stat(cached).st_mtime)
# Do we need to write this temporary file now?
try:
with open(cached + '.new', 'wb') as stream:
stream.write(new_xml)
os.utime(cached + '.new', (modified_time, modified_time))
new_mtime = reader.check_readable(feed_url, cached + '.new')
assert new_mtime == modified_time
old_modified = self._get_signature_date(feed_url) or old_modified
if old_modified:
if new_mtime < old_modified:
raise ReplayAttack(_("New feed's modification time is "
"before old version!\nInterface: %(iface)s\nOld time: %(old_time)s\nNew time: %(new_time)s\n"
"Refusing update.")
% {'iface': feed_url, 'old_time': _pretty_time(old_modified), 'new_time': _pretty_time(new_mtime)})
if new_mtime == old_modified:
# You used to have to update the modification time manually.
# Now it comes from the signature, this check isn't useful
# and often causes problems when the stored format changes
# (e.g., when we stopped writing last-modified attributes)
pass
#raise SafeException("Interface has changed, but modification time "
# "hasn't! Refusing update.")
except:
os.unlink(cached + '.new')
raise
portable_rename(cached + '.new', cached)
logger.debug(_("Saved as %s") % cached)
self.get_feed(feed_url, force = True)
开发者ID:dabrahams,项目名称:0install,代码行数:56,代码来源:iface_cache.py
示例13: _write_store
def _write_store(self, fn):
"""Call fn(first_system_store). If it's read-only, try again with the user store."""
if len(self.stores) > 1:
try:
fn(self.get_first_system_store())
return
except NonwritableStore:
logger.debug(_("%s not-writable. Trying helper instead."), self.get_first_system_store())
pass
fn(self.stores[0], try_helper = True)
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:10,代码来源:__init__.py
示例14: _get_tar_version
def _get_tar_version():
global _tar_version
if _tar_version is None:
child = subprocess.Popen(['tar', '--version'], stdout = subprocess.PIPE,
stderr = subprocess.STDOUT, universal_newlines = True)
out, unused = child.communicate()
child.stdout.close()
child.wait()
_tar_version = out.split('\n', 1)[0]
logger.debug(_("tar version = %s"), _tar_version)
return _tar_version
开发者ID:dabrahams,项目名称:0install,代码行数:11,代码来源:unpack.py
示例15: _get_cpio_version
def _get_cpio_version():
global _cpio_version
if _cpio_version is None:
child = subprocess.Popen(
["cpio", "--version"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
)
out, unused = child.communicate()
child.stdout.close()
child.wait()
_cpio_version = out.split("\n", 1)[0]
logger.debug(_("cpio version = %s"), _cpio_version)
return _cpio_version
开发者ID:pombredanne,项目名称:0install,代码行数:12,代码来源:unpack.py
示例16: _add_site_packages
def _add_site_packages(interface, site_packages, known_site_feeds):
for impl in os.listdir(site_packages):
if impl.startswith('.'): continue
feed = os.path.join(site_packages, impl, '0install', 'feed.xml')
if not os.path.exists(feed):
logger.warn(_("Site-local feed {path} not found").format(path = feed))
logger.debug("Adding site-local feed '%s'", feed)
# (we treat these as user overrides in order to let old versions of 0install
# find them)
interface.extra_feeds.append(Feed(feed, None, user_override = True, site_package = True))
known_site_feeds.add(feed)
开发者ID:dabrahams,项目名称:0install,代码行数:12,代码来源:reader.py
示例17: usable_feeds
def usable_feeds(self, iface, arch):
"""Generator for C{iface.feeds} that are valid for this architecture.
@type iface: L{model.Interface}
@rtype: generator
@see: L{arch}
@since: 0.53"""
for f in self.get_feed_imports(iface):
if f.os in arch.os_ranks and f.machine in arch.machine_ranks:
yield f
else:
logger.debug(_("Skipping '%(feed)s'; unsupported architecture %(os)s-%(machine)s"),
{'feed': f, 'os': f.os, 'machine': f.machine})
开发者ID:res2k,项目名称:0install,代码行数:12,代码来源:iface_cache.py
示例18: recent_gnu_tar
def recent_gnu_tar():
"""@deprecated: should be private"""
recent_gnu_tar = False
if _gnu_tar():
version = re.search(r'\)\s*(\d+(\.\d+)*)', _get_tar_version())
if version:
version = list(map(int, version.group(1).split('.')))
recent_gnu_tar = version > [1, 13, 92]
else:
logger.warn(_("Failed to extract GNU tar version number"))
logger.debug(_("Recent GNU tar = %s"), recent_gnu_tar)
return recent_gnu_tar
开发者ID:dabrahams,项目名称:0install,代码行数:12,代码来源:unpack.py
示例19: check_manifest_and_rename
def check_manifest_and_rename(self, required_digest, tmp, dry_run = False):
"""Check that tmp has the required_digest and move it into the stores. On success, tmp no longer exists.
@since: 2.3"""
if len(self.stores) > 1:
store = self.get_first_system_store()
try:
store.add_dir_to_cache(required_digest, tmp, dry_run = dry_run)
support.ro_rmtree(tmp)
return
except NonwritableStore:
logger.debug(_("%s not-writable. Trying helper instead."), store)
pass
self.stores[0].check_manifest_and_rename(required_digest, tmp, dry_run = dry_run, try_helper = True)
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:13,代码来源:__init__.py
示例20: _get_tar_version
def _get_tar_version():
"""@rtype: str"""
global _tar_version
if _tar_version is None:
child = subprocess.Popen(
["tar", "--version"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
)
out, unused = child.communicate()
child.stdout.close()
child.wait()
_tar_version = out.split("\n", 1)[0]
logger.debug(_("tar version = %s"), _tar_version)
return _tar_version
开发者ID:pombredanne,项目名称:0install,代码行数:13,代码来源:unpack.py
注:本文中的zeroinstall.logger.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论