• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python logger.debug函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zeroinstall.logger.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了debug函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: update_feed_from_network

	def update_feed_from_network(self, feed_url, new_xml, modified_time, dry_run = False):
		"""Update a cached feed.
		Called by L{update_feed_if_trusted} if we trust this data.
		After a successful update, L{writer} is used to update the feed's
		last_checked time.
		@param feed_url: the feed being updated
		@type feed_url: L{model.Interface}
		@param new_xml: the downloaded replacement feed document
		@type new_xml: str
		@param modified_time: the timestamp of the oldest trusted signature (used as an approximation to the feed's modification time)
		@type modified_time: long
		@type dry_run: bool
		@raises ReplayAttack: if modified_time is older than the currently cached time
		@since: 0.48"""
		logger.debug(_("Updating '%(interface)s' from network; modified at %(time)s") %
			{'interface': feed_url, 'time': _pretty_time(modified_time)})

		self._import_new_feed(feed_url, new_xml, modified_time, dry_run)

		if dry_run: return

		feed = self.get_feed(feed_url)

		from . import writer
		feed.last_checked = int(time.time())
		writer.save_feed(feed)

		logger.info(_("Updated feed cache entry for %(interface)s (modified %(time)s)"),
			{'interface': feed.get_name(), 'time': _pretty_time(modified_time)})
开发者ID:res2k,项目名称:0install,代码行数:29,代码来源:iface_cache.py


示例2: __init__

	def __init__(self):
		# Always add the user cache to have a reliable fallback location for storage
		user_store = os.path.join(basedir.xdg_cache_home, '0install.net', 'implementations')
		self.stores = [Store(user_store)]

		# Add custom cache locations
		dirs = []
		for impl_dirs in basedir.load_config_paths('0install.net', 'injector', 'implementation-dirs'):
			with open(impl_dirs, 'rt') as stream:
				dirs.extend(stream.readlines())
		for directory in dirs:
			directory = directory.strip()
			if directory and not directory.startswith('#'):
				logger.debug(_("Added system store '%s'"), directory)
				self.stores.append(Store(directory))

		# Add the system cache when not in portable mode
		if not os.environ.get('ZEROINSTALL_PORTABLE_BASE'):
			if os.name == "nt":
				from win32com.shell import shell, shellcon
				commonAppData = shell.SHGetFolderPath(0, shellcon.CSIDL_COMMON_APPDATA, 0, 0)
				systemCachePath = os.path.join(commonAppData, "0install.net", "implementations")
				# Only use shared cache location on Windows if it was explicitly created
				if os.path.isdir(systemCachePath):
					self.stores.append(Store(systemCachePath))
			else:
				self.stores.append(Store('/var/cache/0install.net/implementations'))
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:27,代码来源:__init__.py


示例3: _finish

	def _finish(self, status):
		"""@type status: int"""
		assert self.status is download_fetching
		assert self.tempfile is not None
		assert not self.aborted_by_user

		if status == RESULT_NOT_MODIFIED:
			logger.debug("%s not modified", self.url)
			self.tempfile = None
			self.unmodified = True
			self.status = download_complete
			self._final_total_size = 0
			return

		self._final_total_size = self.get_bytes_downloaded_so_far()

		self.tempfile = None

		try:
			assert status == RESULT_OK

			# Check that the download has the correct size, if we know what it should be.
			if self.expected_size is not None:
				if self._final_total_size != self.expected_size:
					raise SafeException(_('Downloaded archive has incorrect size.\n'
							'URL: %(url)s\n'
							'Expected: %(expected_size)d bytes\n'
							'Received: %(size)d bytes') % {'url': self.url, 'expected_size': self.expected_size, 'size': self._final_total_size})
		except:
			self.status = download_failed
			raise
		else:
			self.status = download_complete
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:33,代码来源:download.py


示例4: __init__

	def __init__(self):
		user_store = os.path.join(basedir.xdg_cache_home, '0install.net', 'implementations')
		self.stores = [Store(user_store)]

		impl_dirs = basedir.load_first_config('0install.net', 'injector',
							  'implementation-dirs')
		logger.debug(_("Location of 'implementation-dirs' config file being used: '%s'"), impl_dirs)
		if impl_dirs:
			with open(impl_dirs, 'rt') as stream:
				dirs = stream.readlines()
		else:
			if os.name == "nt":
				from win32com.shell import shell, shellcon
				localAppData = shell.SHGetFolderPath(0, shellcon.CSIDL_LOCAL_APPDATA, 0, 0)
				commonAppData = shell.SHGetFolderPath(0, shellcon.CSIDL_COMMON_APPDATA, 0, 0)

				userCache = os.path.join(localAppData, "0install.net", "implementations")
				sharedCache = os.path.join(commonAppData, "0install.net", "implementations")
				dirs = [userCache, sharedCache]

			else:
				dirs = ['/var/cache/0install.net/implementations']

		for directory in dirs:
			directory = directory.strip()
			if directory and not directory.startswith('#'):
				logger.debug(_("Added system store '%s'"), directory)
				self.stores.append(Store(directory))
开发者ID:dabrahams,项目名称:0install,代码行数:28,代码来源:__init__.py


示例5: __init__

    def __init__(self, config, requirements):
        """
		@param config: The configuration settings to use
		@type config: L{config.Config}
		@param requirements: Details about the program we want to run
		@type requirements: L{requirements.Requirements}
		@since: 0.53
		"""
        self.watchers = []

        assert config
        self.config = config

        assert requirements
        self.requirements = requirements

        self.target_arch = arch.get_architecture(requirements.os, requirements.cpu)

        from zeroinstall.injector.solver import DefaultSolver

        self.solver = DefaultSolver(self.config)

        logger.debug(_("Supported systems: '%s'"), arch.os_ranks)
        logger.debug(_("Supported processors: '%s'"), arch.machine_ranks)

        if requirements.before or requirements.not_before:
            self.solver.extra_restrictions[config.iface_cache.get_interface(requirements.interface_uri)] = [
                model.VersionRangeRestriction(
                    model.parse_version(requirements.before), model.parse_version(requirements.not_before)
                )
            ]
开发者ID:timdiels,项目名称:0install,代码行数:31,代码来源:driver.py


示例6: is_stale

	def is_stale(self, feed_url, freshness_threshold):
		"""Check whether feed needs updating, based on the configured L{config.Config.freshness}.
		None is considered to be stale.
		If we already tried to update the feed within FAILED_CHECK_DELAY, returns false.
		@type feed_url: str
		@type freshness_threshold: int
		@return: True if feed should be updated
		@rtype: bool
		@since: 0.53"""
		if isinstance(feed_url, model.ZeroInstallFeed):
			feed_url = feed_url.url		# old API
		elif feed_url is None:
			return True			# old API

		now = time.time()

		feed = self.get_feed(feed_url)
		if feed is not None:
			if feed.local_path is not None:
				return False		# Local feeds are never stale

			if feed.last_modified is not None:
				staleness = now - (feed.last_checked or 0)
				logger.debug(_("Staleness for %(feed)s is %(staleness).2f hours"), {'feed': feed, 'staleness': staleness / 3600.0})

				if freshness_threshold <= 0 or staleness < freshness_threshold:
					return False		# Fresh enough for us
		# else we've never had it

		last_check_attempt = self.get_last_check_attempt(feed_url)
		if last_check_attempt and last_check_attempt > now - FAILED_CHECK_DELAY:
			logger.debug(_("Stale, but tried to check recently (%s) so not rechecking now."), time.ctime(last_check_attempt))
			return False

		return True
开发者ID:res2k,项目名称:0install,代码行数:35,代码来源:iface_cache.py


示例7: download_icon

    def download_icon(self, interface, force=False):
        """Download an icon for this interface and add it to the
		icon cache. If the interface has no icon do nothing.
		@return: the task doing the import, or None
		@rtype: L{tasks.Task}"""
        logger.debug("download_icon %(interface)s", {"interface": interface})

        modification_time = None
        existing_icon = self.config.iface_cache.get_icon_path(interface)
        if existing_icon:
            file_mtime = os.stat(existing_icon).st_mtime
            from email.utils import formatdate

            modification_time = formatdate(timeval=file_mtime, localtime=False, usegmt=True)

        feed = self.config.iface_cache.get_feed(interface.uri)
        if feed is None:
            return None

            # Find a suitable icon to download
        for icon in feed.get_metadata(XMLNS_IFACE, "icon"):
            type = icon.getAttribute("type")
            if type != "image/png":
                logger.debug(_("Skipping non-PNG icon"))
                continue
            source = icon.getAttribute("href")
            if source:
                break
            logger.warn(_('Missing "href" attribute on <icon> in %s'), interface)
        else:
            logger.info(_("No PNG icons found in %s"), interface)
            return

        dl = self.download_url(source, hint=interface, modification_time=modification_time)

        @tasks.async
        def download_and_add_icon():
            stream = dl.tempfile
            try:
                yield dl.downloaded
                tasks.check(dl.downloaded)
                if dl.unmodified:
                    return
                stream.seek(0)

                import shutil, tempfile

                icons_cache = basedir.save_cache_path(config_site, "interface_icons")

                tmp_file = tempfile.NamedTemporaryFile(dir=icons_cache, delete=False)
                shutil.copyfileobj(stream, tmp_file)
                tmp_file.close()

                icon_file = os.path.join(icons_cache, escape(interface.uri))
                portable_rename(tmp_file.name, icon_file)
            finally:
                stream.close()

        return download_and_add_icon()
开发者ID:michel-slm,项目名称:0install,代码行数:59,代码来源:fetch.py


示例8: download_impls

	def download_impls(self, implementations, stores):
		"""Download the given implementations, choosing a suitable retrieval method for each.
		If any of the retrieval methods are DistributionSources and
		need confirmation, handler.confirm is called to check that the
		installation should proceed.
		@type implementations: [L{zeroinstall.injector.model.ZeroInstallImplementation}]
		@type stores: L{zeroinstall.zerostore.Stores}
		@rtype: L{zeroinstall.support.tasks.Blocker}"""
		unsafe_impls = []

		to_download = []
		for impl in implementations:
			logger.debug(_("start_downloading_impls: for %(feed)s get %(implementation)s"), {'feed': impl.feed, 'implementation': impl})
			source = self.get_best_source(impl)
			if not source:
				raise SafeException(_("Implementation %(implementation_id)s of interface %(interface)s"
					" cannot be downloaded (no download locations given in "
					"interface!)") % {'implementation_id': impl.id, 'interface': impl.feed.get_name()})
			to_download.append((impl, source))

			if isinstance(source, DistributionSource) and source.needs_confirmation:
				unsafe_impls.append(source.package_id)

		@tasks.async
		def download_impls():
			if unsafe_impls:
				confirm = self.handler.confirm_install(_('The following components need to be installed using native packages. '
					'These come from your distribution, and should therefore be trustworthy, but they also '
					'run with extra privileges. In particular, installing them may run extra services on your '
					'computer or affect other users. You may be asked to enter a password to confirm. The '
					'packages are:\n\n') + ('\n'.join('- ' + x for x in unsafe_impls)))
				yield confirm
				tasks.check(confirm)

			blockers = []

			for impl, source in to_download:
				blockers.append(self.download_impl(impl, source, stores))

			# Record the first error log the rest
			error = []
			def dl_error(ex, tb = None):
				if error:
					self.handler.report_error(ex)
				else:
					error.append((ex, tb))
			while blockers:
				yield blockers
				tasks.check(blockers, dl_error)

				blockers = [b for b in blockers if not b.happened]
			if error:
				from zeroinstall import support
				support.raise_with_traceback(*error[0])

		if not to_download:
			return None

		return download_impls()
开发者ID:res2k,项目名称:0install,代码行数:59,代码来源:fetch.py


示例9: recv_json

def recv_json():
	logger.debug("Waiting for length...")
	l = stdin.readline().strip()
	logger.debug("Read '%s' from master", l)
	if not l:
		sys.stdout = sys.stderr
		return None
	return json.loads(stdin.read(int(l)).decode('utf-8'))
开发者ID:rammstein,项目名称:0install,代码行数:8,代码来源:slave.py


示例10: recv_json

def recv_json():
	logger.debug("Waiting for length...")
	data = read_chunk()
	if not data:
		sys.stdout = sys.stderr
		return None
	data = data.decode('utf-8')
	logger.debug("Read '%s' from master", data)
	return json.loads(data)
开发者ID:afb,项目名称:0install,代码行数:9,代码来源:slave.py


示例11: handle_invoke

def handle_invoke(config, options, ticket, request):
	try:
		command = request[0]
		logger.debug("Got request '%s'", command)
		if command == 'open-gui':
			response = do_open_gui(request[1:])
		elif command == 'run-gui':
			do_run_gui(ticket)
			return #async
		elif command == 'wait-for-network':
			response = do_wait_for_network(config)
		elif command == 'check-gui':
			response = do_check_gui(request[1])
		elif command == 'report-error':
			response = do_report_error(config, request[1])
		elif command == 'gui-update-selections':
			xml = qdom.parse(BytesIO(read_chunk()))
			response = do_gui_update_selections(request[1:], xml)
		elif command == 'download-selections':
			xml = qdom.parse(BytesIO(read_chunk()))
			blocker = do_download_selections(config, ticket, options, request[1:], xml)
			return #async
		elif command == 'import-feed':
			xml = qdom.parse(BytesIO(read_chunk()))
			response = do_import_feed(config, xml)
		elif command == 'get-package-impls':
			xml = qdom.parse(BytesIO(read_chunk()))
			response = do_get_package_impls(config, options, request[1:], xml)
		elif command == 'is-distro-package-installed':
			xml = qdom.parse(BytesIO(read_chunk()))
			response = do_is_distro_package_installed(config, options, xml)
		elif command == 'get-distro-candidates':
			xml = qdom.parse(BytesIO(read_chunk()))
			blocker = do_get_distro_candidates(config, request[1:], xml)
			reply_when_done(ticket, blocker)
			return	# async
		elif command == 'confirm-keys':
			do_confirm_keys(config, ticket, request[1:])
			return	# async
		elif command == 'download-url':
			do_download_url(config, ticket, request[1:])
			return
		elif command == 'notify-user':
			response = do_notify_user(config, request[1])
		else:
			raise SafeException("Internal error: unknown command '%s'" % command)
		response = ['ok', response]
	except SafeException as ex:
		logger.info("Replying with error: %s", ex)
		response = ['error', str(ex)]
	except Exception as ex:
		import traceback
		logger.info("Replying with error: %s", ex)
		response = ['error', traceback.format_exc().strip()]

	send_json(["return", ticket, response])
开发者ID:linuxmidhun,项目名称:0install,代码行数:56,代码来源:slave.py


示例12: _import_new_feed

	def _import_new_feed(self, feed_url, new_xml, modified_time):
		"""Write new_xml into the cache.
		@param feed_url: the URL for the feed being updated
		@param new_xml: the data to write
		@param modified_time: when new_xml was modified
		@raises ReplayAttack: if the new mtime is older than the current one
		"""
		assert modified_time
		assert isinstance(new_xml, bytes), repr(new_xml)

		upstream_dir = basedir.save_cache_path(config_site, 'interfaces')
		cached = os.path.join(upstream_dir, escape(feed_url))

		old_modified = None
		if os.path.exists(cached):
			with open(cached, 'rb') as stream:
				old_xml = stream.read()
			if old_xml == new_xml:
				logger.debug(_("No change"))
				# Update in-memory copy, in case someone else updated the disk copy
				self.get_feed(feed_url, force = True)
				return
			old_modified = int(os.stat(cached).st_mtime)

		# Do we need to write this temporary file now?
		try:
			with open(cached + '.new', 'wb') as stream:
				stream.write(new_xml)
			os.utime(cached + '.new', (modified_time, modified_time))
			new_mtime = reader.check_readable(feed_url, cached + '.new')
			assert new_mtime == modified_time

			old_modified = self._get_signature_date(feed_url) or old_modified

			if old_modified:
				if new_mtime < old_modified:
					raise ReplayAttack(_("New feed's modification time is "
						"before old version!\nInterface: %(iface)s\nOld time: %(old_time)s\nNew time: %(new_time)s\n"
						"Refusing update.")
						% {'iface': feed_url, 'old_time': _pretty_time(old_modified), 'new_time': _pretty_time(new_mtime)})
				if new_mtime == old_modified:
					# You used to have to update the modification time manually.
					# Now it comes from the signature, this check isn't useful
					# and often causes problems when the stored format changes
					# (e.g., when we stopped writing last-modified attributes)
					pass
					#raise SafeException("Interface has changed, but modification time "
					#		    "hasn't! Refusing update.")
		except:
			os.unlink(cached + '.new')
			raise

		portable_rename(cached + '.new', cached)
		logger.debug(_("Saved as %s") % cached)

		self.get_feed(feed_url, force = True)
开发者ID:dabrahams,项目名称:0install,代码行数:56,代码来源:iface_cache.py


示例13: _write_store

	def _write_store(self, fn):
		"""Call fn(first_system_store). If it's read-only, try again with the user store."""
		if len(self.stores) > 1:
			try:
				fn(self.get_first_system_store())
				return
			except NonwritableStore:
				logger.debug(_("%s not-writable. Trying helper instead."), self.get_first_system_store())
				pass
		fn(self.stores[0], try_helper = True)
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:10,代码来源:__init__.py


示例14: _get_tar_version

def _get_tar_version():
	global _tar_version
	if _tar_version is None:
		child = subprocess.Popen(['tar', '--version'], stdout = subprocess.PIPE,
					stderr = subprocess.STDOUT, universal_newlines = True)
		out, unused = child.communicate()
		child.stdout.close()
		child.wait()
		_tar_version = out.split('\n', 1)[0]
		logger.debug(_("tar version = %s"), _tar_version)
	return _tar_version
开发者ID:dabrahams,项目名称:0install,代码行数:11,代码来源:unpack.py


示例15: _get_cpio_version

def _get_cpio_version():
    global _cpio_version
    if _cpio_version is None:
        child = subprocess.Popen(
            ["cpio", "--version"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
        )
        out, unused = child.communicate()
        child.stdout.close()
        child.wait()
        _cpio_version = out.split("\n", 1)[0]
        logger.debug(_("cpio version = %s"), _cpio_version)
    return _cpio_version
开发者ID:pombredanne,项目名称:0install,代码行数:12,代码来源:unpack.py


示例16: _add_site_packages

def _add_site_packages(interface, site_packages, known_site_feeds):
	for impl in os.listdir(site_packages):
		if impl.startswith('.'): continue
		feed = os.path.join(site_packages, impl, '0install', 'feed.xml')
		if not os.path.exists(feed):
			logger.warn(_("Site-local feed {path} not found").format(path = feed))
		logger.debug("Adding site-local feed '%s'", feed)

		# (we treat these as user overrides in order to let old versions of 0install
		# find them)
		interface.extra_feeds.append(Feed(feed, None, user_override = True, site_package = True))
		known_site_feeds.add(feed)
开发者ID:dabrahams,项目名称:0install,代码行数:12,代码来源:reader.py


示例17: usable_feeds

	def usable_feeds(self, iface, arch):
		"""Generator for C{iface.feeds} that are valid for this architecture.
		@type iface: L{model.Interface}
		@rtype: generator
		@see: L{arch}
		@since: 0.53"""
		for f in self.get_feed_imports(iface):
			if f.os in arch.os_ranks and f.machine in arch.machine_ranks:
				yield f
			else:
				logger.debug(_("Skipping '%(feed)s'; unsupported architecture %(os)s-%(machine)s"),
					{'feed': f, 'os': f.os, 'machine': f.machine})
开发者ID:res2k,项目名称:0install,代码行数:12,代码来源:iface_cache.py


示例18: recent_gnu_tar

def recent_gnu_tar():
	"""@deprecated: should be private"""
	recent_gnu_tar = False
	if _gnu_tar():
		version = re.search(r'\)\s*(\d+(\.\d+)*)', _get_tar_version())
		if version:
			version = list(map(int, version.group(1).split('.')))
			recent_gnu_tar = version > [1, 13, 92]
		else:
			logger.warn(_("Failed to extract GNU tar version number"))
	logger.debug(_("Recent GNU tar = %s"), recent_gnu_tar)
	return recent_gnu_tar
开发者ID:dabrahams,项目名称:0install,代码行数:12,代码来源:unpack.py


示例19: check_manifest_and_rename

	def check_manifest_and_rename(self, required_digest, tmp, dry_run = False):
		"""Check that tmp has the required_digest and move it into the stores. On success, tmp no longer exists.
		@since: 2.3"""
		if len(self.stores) > 1:
			store = self.get_first_system_store()
			try:
				store.add_dir_to_cache(required_digest, tmp, dry_run = dry_run)
				support.ro_rmtree(tmp)
				return
			except NonwritableStore:
				logger.debug(_("%s not-writable. Trying helper instead."), store)
				pass
		self.stores[0].check_manifest_and_rename(required_digest, tmp, dry_run = dry_run, try_helper = True)
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:13,代码来源:__init__.py


示例20: _get_tar_version

def _get_tar_version():
    """@rtype: str"""
    global _tar_version
    if _tar_version is None:
        child = subprocess.Popen(
            ["tar", "--version"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
        )
        out, unused = child.communicate()
        child.stdout.close()
        child.wait()
        _tar_version = out.split("\n", 1)[0]
        logger.debug(_("tar version = %s"), _tar_version)
    return _tar_version
开发者ID:pombredanne,项目名称:0install,代码行数:13,代码来源:unpack.py



注:本文中的zeroinstall.logger.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python logger.info函数代码示例发布时间:2022-05-26
下一篇:
Python requirements.Requirements类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap