• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils2.safe_str函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中rhodecode.lib.utils2.safe_str函数的典型用法代码示例。如果您正苦于以下问题:Python safe_str函数的具体用法?Python safe_str怎么用?Python safe_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了safe_str函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: differ

def differ(org_repo, org_ref, other_repo, other_ref,
           context=3, ignore_whitespace=False):
    """
    General differ between branches, bookmarks, revisions of two remote or
    local but related repositories

    :param org_repo:
    :param org_ref:
    :param other_repo:
    :type other_repo:
    :type other_ref:
    """

    org_repo_scm = org_repo.scm_instance
    other_repo_scm = other_repo.scm_instance

    org_repo = org_repo_scm._repo
    other_repo = other_repo_scm._repo

    org_ref = safe_str(org_ref[1])
    other_ref = safe_str(other_ref[1])

    if org_repo_scm == other_repo_scm:
        log.debug('running diff between %[email protected]%s and %[email protected]%s'
                  % (org_repo.path, org_ref,
                     other_repo.path, other_ref))
        _diff = org_repo_scm.get_diff(rev1=org_ref, rev2=other_ref,
            ignore_whitespace=ignore_whitespace, context=context)
        return _diff

    return '' # FIXME: when is it ever relevant to return nothing?
开发者ID:jeffjirsa,项目名称:rhodecode,代码行数:31,代码来源:diffs.py


示例2: is_valid_repos_group

def is_valid_repos_group(repos_group_name, base_path):
    """
    Returns True if given path is a repos group False otherwise

    :param repo_name:
    :param base_path:
    """
    full_path = os.path.join(safe_str(base_path), safe_str(repos_group_name))

    # check if it's not a repo
    if is_valid_repo(repos_group_name, base_path):
        return False

    try:
        # we need to check bare git repos at higher level
        # since we might match branches/hooks/info/objects or possible
        # other things inside bare git repo
        get_scm(os.path.dirname(full_path))
        return False
    except VCSError:
        pass

    # check if it's a valid path
    if os.path.isdir(full_path):
        return True

    return False
开发者ID:break123,项目名称:rhodecode,代码行数:27,代码来源:utils.py


示例3: commit_change

    def commit_change(self, repo, repo_name, cs, user, author, message,
                      content, f_path):
        """
        Commits changes

        :param repo: SCM instance

        """
        user = self._get_user(user)
        IMC = self._get_IMC_module(repo.alias)

        # decoding here will force that we have proper encoded values
        # in any other case this will throw exceptions and deny commit
        content = safe_str(content)
        path = safe_str(f_path)
        # message and author needs to be unicode
        # proper backend should then translate that into required type
        message = safe_unicode(message)
        author = safe_unicode(author)
        imc = IMC(repo)
        imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
        try:
            tip = imc.commit(message=message, author=author,
                             parents=[cs], branch=cs.branch)
        except Exception, e:
            log.error(traceback.format_exc())
            raise IMCCommitError(str(e))
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:27,代码来源:scm.py


示例4: commit_change

    def commit_change(self, repo, repo_name, cs, user, author, message,
                      content, f_path):
        """
        Commits changes

        :param repo: SCM instance

        """
        user = self._get_user(user)
        IMC = self._get_IMC_module(repo.alias)

        # decoding here will force that we have proper encoded values
        # in any other case this will throw exceptions and deny commit
        content = safe_str(content)
        path = safe_str(f_path)
        # message and author needs to be unicode
        # proper backend should then translate that into required type
        message = safe_unicode(message)
        author = safe_unicode(author)
        imc = IMC(repo)
        imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
        tip = imc.commit(message=message,
                       author=author,
                       parents=[cs], branch=cs.branch)

        self.mark_for_invalidation(repo_name)
        self._handle_push(repo,
                          username=user.username,
                          action='push_local',
                          repo_name=repo_name,
                          revisions=[tip.raw_id])
        return tip
开发者ID:greenboxindonesia,项目名称:rhodecode,代码行数:32,代码来源:scm.py


示例5: commit_change

    def commit_change(self, repo, repo_name, cs, user, author, message,
                      content, f_path):
        """
        Commits changes

        :param repo: SCM instance

        """

        if repo.alias == 'hg':
            from rhodecode.lib.vcs.backends.hg import \
                MercurialInMemoryChangeset as IMC
        elif repo.alias == 'git':
            from rhodecode.lib.vcs.backends.git import \
                GitInMemoryChangeset as IMC

        # decoding here will force that we have proper encoded values
        # in any other case this will throw exceptions and deny commit
        content = safe_str(content)
        path = safe_str(f_path)
        # message and author needs to be unicode
        # proper backend should then translate that into required type
        message = safe_unicode(message)
        author = safe_unicode(author)
        m = IMC(repo)
        m.change(FileNode(path, content))
        tip = m.commit(message=message,
                       author=author,
                       parents=[cs], branch=cs.branch)

        action = 'push_local:%s' % tip.raw_id
        action_logger(user, action, repo_name)
        self.mark_for_invalidation(repo_name)
        return tip
开发者ID:yujiro,项目名称:rhodecode,代码行数:34,代码来源:scm.py


示例6: __init__

    def __init__(self, server, base_dn, port=389, bind_dn='', bind_pass='',
                 tls_kind='PLAIN', tls_reqcert='DEMAND', ldap_version=3,
                 ldap_filter='(&(objectClass=user)(!(objectClass=computer)))',
                 search_scope='SUBTREE', attr_login='uid'):
        self.ldap_version = ldap_version
        ldap_server_type = 'ldap'

        self.TLS_KIND = tls_kind

        if self.TLS_KIND == 'LDAPS':
            port = port or 689
            ldap_server_type = ldap_server_type + 's'

        OPT_X_TLS_DEMAND = 2
        self.TLS_REQCERT = getattr(ldap, 'OPT_X_TLS_%s' % tls_reqcert,
                                   OPT_X_TLS_DEMAND)
        self.LDAP_SERVER_ADDRESS = server
        self.LDAP_SERVER_PORT = port

        # USE FOR READ ONLY BIND TO LDAP SERVER
        self.LDAP_BIND_DN = safe_str(bind_dn)
        self.LDAP_BIND_PASS = safe_str(bind_pass)

        self.LDAP_SERVER = "%s://%s:%s" % (ldap_server_type,
                                           self.LDAP_SERVER_ADDRESS,
                                           self.LDAP_SERVER_PORT)

        self.BASE_DN = safe_str(base_dn)
        self.LDAP_FILTER = safe_str(ldap_filter)
        self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope)
        self.attr_login = attr_login
开发者ID:yujiro,项目名称:rhodecode,代码行数:31,代码来源:auth_ldap.py


示例7: get_file_history

    def get_file_history(self, path, limit=None):
        """
        Returns history of file as reversed list of ``Changeset`` objects for
        which file at given ``path`` has been modified.

        TODO: This function now uses os underlying 'git' and 'grep' commands
        which is generally not good. Should be replaced with algorithm
        iterating commits.
        """
        self._get_filectx(path)
        cs_id = safe_str(self.id)
        f_path = safe_str(path)

        if limit:
            cmd = 'log -n %s --pretty="format: %%H" -s -p %s -- "%s"' % (
                      safe_int(limit, 0), cs_id, f_path
                   )

        else:
            cmd = 'log --pretty="format: %%H" -s -p %s -- "%s"' % (
                      cs_id, f_path
                   )
        so, se = self.repository.run_git_command(cmd)
        ids = re.findall(r'[0-9a-fA-F]{40}', so)
        return [self.repository.get_changeset(id) for id in ids]
开发者ID:jeffjirsa,项目名称:rhodecode,代码行数:25,代码来源:changeset.py


示例8: command

    def command(self):
        # get SqlAlchemy session
        self._init_session()

        repos_location = RhodeCodeUi.get_repos_location()
        to_remove = []
        for dn, dirs, f in os.walk(safe_str(repos_location)):
            alldirs = list(dirs)
            del dirs[:]
            if ".hg" in alldirs or "objects" in alldirs and ("refs" in alldirs or "packed-refs" in f):
                continue
            for loc in alldirs:
                if REMOVED_REPO_PAT.match(loc):
                    to_remove.append([os.path.join(dn, loc), self._extract_date(loc)])
                else:
                    dirs.append(loc)

        # filter older than (if present)!
        now = datetime.datetime.now()
        older_than = self.options.older_than
        if older_than:
            to_remove_filtered = []
            older_than_date = self._parse_older_than(older_than)
            for name, date_ in to_remove:
                repo_age = now - date_
                if repo_age > older_than_date:
                    to_remove_filtered.append([name, date_])

            to_remove = to_remove_filtered
            print >> sys.stdout, "removing %s deleted repos older than %s (%s)" % (
                len(to_remove),
                older_than,
                older_than_date,
            )
        else:
            print >> sys.stdout, "removing all [%s] deleted repos" % len(to_remove)
        if self.options.dont_ask or not to_remove:
            # don't ask just remove !
            remove = True
        else:
            remove = ask_ok(
                "the following repositories will be deleted completely:\n%s\n"
                "are you sure you want to remove them [y/n]?"
                % ", \n".join(["%s removed on %s" % (safe_str(x[0]), safe_str(x[1])) for x in to_remove])
            )

        if remove:
            for path, date_ in to_remove:
                print >> sys.stdout, "removing repository %s" % path
                shutil.rmtree(path)
        else:
            print "nothing done exiting..."
            sys.exit(0)
开发者ID:jeffjirsa,项目名称:rhodecode,代码行数:53,代码来源:cleanup.py


示例9: get_node

    def get_node(self, repo, path):
        """
        gets a filenode based on given full path.It operates on string for
        hg git compatability.

        :param repo: scm repo instance
        :param path: full path including root location
        :return: FileNode
        """
        root_path = safe_str(repo.path)+'/'
        parts = safe_str(path).partition(root_path)
        cs = self._get_index_changeset(repo)
        node = cs.get_node(parts[-1])
        return node
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:14,代码来源:daemon.py


示例10: command

    def command(self):
        logging.config.fileConfig(self.path_to_ini_file)
        from pylons import config

        #get to remove repos !!
        add_cache(config)
        engine = engine_from_config(config, 'sqlalchemy.db1.')
        init_model(engine)

        repos_location = RhodeCodeUi.get_repos_location()
        to_remove = []
        for dn, dirs, f in os.walk(safe_str(repos_location)):
            for loc in dirs:
                if REMOVED_REPO_PAT.match(loc):
                    to_remove.append([loc, self._extract_date(loc)])

        #filter older than (if present)!
        now = datetime.datetime.now()
        older_than = self.options.older_than
        if older_than:
            to_remove_filtered = []
            older_than_date = self._parse_older_than(older_than)
            for name, date_ in to_remove:
                repo_age = now - date_
                if repo_age > older_than_date:
                    to_remove_filtered.append([name, date_])

            to_remove = to_remove_filtered
            print >> sys.stdout, 'removing [%s] deleted repos older than %s[%s]' \
                % (len(to_remove), older_than, older_than_date)
        else:
            print >> sys.stdout, 'removing all [%s] deleted repos' \
                % len(to_remove)
        if self.options.dont_ask or not to_remove:
            # don't ask just remove !
            remove = True
        else:
            remove = ask_ok('are you sure to remove listed repos \n%s [y/n]?'
                            % ', \n'.join(['%s removed on %s'
                    % (safe_str(x[0]), safe_str(x[1])) for x in to_remove]))

        if remove:
            for name, date_ in to_remove:
                print >> sys.stdout, 'removing repository %s' % name
                shutil.rmtree(os.path.join(repos_location, name))
        else:
            print 'nothing done exiting...'
            sys.exit(0)
开发者ID:break123,项目名称:rhodecode,代码行数:48,代码来源:cleanup.py


示例11: _get_scm_size

def _get_scm_size(alias, root_path):

    if not alias.startswith('.'):
        alias += '.'

    size_scm, size_root = 0, 0
    for path, dirs, files in os.walk(safe_str(root_path)):
        if path.find(alias) != -1:
            for f in files:
                try:
                    size_scm += os.path.getsize(os.path.join(path, f))
                except OSError:
                    pass
        else:
            for f in files:
                try:
                    size_root += os.path.getsize(os.path.join(path, f))
                except OSError:
                    pass

    size_scm_f = h.format_byte_size(size_scm)
    size_root_f = h.format_byte_size(size_root)
    size_total_f = h.format_byte_size(size_root + size_scm)

    return size_scm_f, size_root_f, size_total_f
开发者ID:greenboxindonesia,项目名称:rhodecode,代码行数:25,代码来源:hooks.py


示例12: gravatar_url

def gravatar_url(email_address, size=30, ssl_enabled=True):
    from pylons import url  # doh, we need to re-import url to mock it later
    from rhodecode import CONFIG

    _def = '[email protected]'  # default gravatar
    use_gravatar = str2bool(CONFIG.get('use_gravatar'))
    alternative_gravatar_url = CONFIG.get('alternative_gravatar_url', '')
    email_address = email_address or _def
    if not use_gravatar or not email_address or email_address == _def:
        f = lambda a, l: min(l, key=lambda x: abs(x - a))
        return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))

    if use_gravatar and alternative_gravatar_url:
        tmpl = alternative_gravatar_url
        parsed_url = urlparse.urlparse(url.current(qualified=True))
        tmpl = tmpl.replace('{email}', email_address)\
                   .replace('{md5email}', hashlib.md5(email_address.lower()).hexdigest()) \
                   .replace('{netloc}', parsed_url.netloc)\
                   .replace('{scheme}', parsed_url.scheme)\
                   .replace('{size}', str(size))
        return tmpl

    default = 'identicon'
    baseurl_nossl = "http://www.gravatar.com/avatar/"
    baseurl_ssl = "https://secure.gravatar.com/avatar/"
    baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl

    if isinstance(email_address, unicode):
        #hashlib crashes on unicode items
        email_address = safe_str(email_address)
    # construct the url
    gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
    gravatar_url += urllib.urlencode({'d': default, 's': str(size)})

    return gravatar_url
开发者ID:greenboxindonesia,项目名称:rhodecode,代码行数:35,代码来源:helpers.py


示例13: _index

    def _index(self, revision, method):
        c.anchor_url = anchor_url
        c.ignorews_url = _ignorews_url
        c.context_url = _context_url
        c.fulldiff = fulldiff = request.GET.get('fulldiff')
        #get ranges of revisions if preset
        rev_range = revision.split('...')[:2]
        enable_comments = True
        try:
            if len(rev_range) == 2:
                enable_comments = False
                rev_start = rev_range[0]
                rev_end = rev_range[1]
                rev_ranges = c.rhodecode_repo.get_changesets(start=rev_start,
                                                             end=rev_end)
            else:
                rev_ranges = [c.rhodecode_repo.get_changeset(revision)]

            c.cs_ranges = list(rev_ranges)
            if not c.cs_ranges:
                raise RepositoryError('Changeset range returned empty result')

        except (RepositoryError, ChangesetDoesNotExistError, Exception), e:
            log.error(traceback.format_exc())
            h.flash(safe_str(e), category='error')
            raise HTTPNotFound()
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:26,代码来源:changeset.py


示例14: __get_instance

    def __get_instance(self):

        repo_full_path = self.repo_full_path

        try:
            alias = get_scm(repo_full_path)[0]
            log.debug("Creating instance of %s repository" % alias)
            backend = get_backend(alias)
        except VCSError:
            log.error(traceback.format_exc())
            log.error(
                "Perhaps this repository is in db and not in "
                "filesystem run rescan repositories with "
                '"destroy old data " option from admin panel'
            )
            return

        if alias == "hg":

            repo = backend(safe_str(repo_full_path), create=False, baseui=self._ui)
            # skip hidden web repository
            if repo._get_hidden():
                return
        else:
            repo = backend(repo_full_path, create=False)

        return repo
开发者ID:break123,项目名称:rhodecode,代码行数:27,代码来源:db_1_2_0.py


示例15: _get_cache_parameters

def _get_cache_parameters(query):
    """For a query with cache_region and cache_namespace configured,
    return the correspoinding Cache instance and cache key, based
    on this query's current criterion and parameter values.

    """
    if not hasattr(query, '_cache_parameters'):
        raise ValueError("This Query does not have caching "
                         "parameters configured.")

    region, namespace, cache_key = query._cache_parameters

    namespace = _namespace_from_query(namespace, query)

    if cache_key is None:
        # cache key - the value arguments from this query's parameters.
        args = [safe_str(x) for x in _params_from_query(query)]
        args.extend(filter(lambda k: k not in ['None', None, u'None'],
                           [str(query._limit), str(query._offset)]))

        cache_key = " ".join(args)

    if cache_key is None:
        raise Exception('Cache key cannot be None')

    # get cache
    #cache = query.cache_manager.get_cache_region(namespace, region)
    cache = get_cache_region(namespace, region)
    # optional - hash the cache_key too for consistent length
    # import uuid
    # cache_key= str(uuid.uuid5(uuid.NAMESPACE_DNS, cache_key))

    return cache, cache_key
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:33,代码来源:caching_query.py


示例16: gravatar_url

def gravatar_url(email_address, size=30):
    from pylons import url  ## doh, we need to re-import url to mock it later
    if(str2bool(config['app_conf'].get('use_gravatar')) and
       config['app_conf'].get('alternative_gravatar_url')):
        tmpl = config['app_conf'].get('alternative_gravatar_url', '')
        parsed_url = urlparse.urlparse(url.current(qualified=True))
        tmpl = tmpl.replace('{email}', email_address)\
                   .replace('{md5email}', hashlib.md5(email_address.lower()).hexdigest()) \
                   .replace('{netloc}', parsed_url.netloc)\
                   .replace('{scheme}', parsed_url.scheme)\
                   .replace('{size}', str(size))
        return tmpl

    if (not str2bool(config['app_conf'].get('use_gravatar')) or
        not email_address or email_address == '[email protected]'):
        f = lambda a, l: min(l, key=lambda x: abs(x - a))
        return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))

    ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
    default = 'identicon'
    baseurl_nossl = "http://www.gravatar.com/avatar/"
    baseurl_ssl = "https://secure.gravatar.com/avatar/"
    baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl

    if isinstance(email_address, unicode):
        #hashlib crashes on unicode items
        email_address = safe_str(email_address)
    # construct the url
    gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
    gravatar_url += urllib.urlencode({'d': default, 's': str(size)})

    return gravatar_url
开发者ID:yujiro,项目名称:rhodecode,代码行数:32,代码来源:helpers.py


示例17: safe_str

def safe_str(unicode_, to_encoding=None):
    """
    safe str function. Does few trick to turn unicode_ into string

    In case of UnicodeEncodeError we try to return it with encoding detected
    by chardet library if it fails fallback to string with errors replaced

    :param unicode_: unicode to encode
    :rtype: str
    :returns: str object
    """
    from rhodecode.lib.utils2 import safe_str
    return safe_str(unicode_, to_encoding)

    if isinstance(unicode_, str):
        return unicode_

    try:
        return unicode_.encode(to_encoding)
    except UnicodeEncodeError:
        pass

    try:
        import chardet
        encoding = chardet.detect(unicode_)['encoding']
        if encoding is None:
            raise UnicodeEncodeError()

        return unicode_.encode(encoding)
    except (ImportError, UnicodeEncodeError):
        return unicode_.encode(to_encoding, 'replace')

    return safe_str
开发者ID:break123,项目名称:rhodecode,代码行数:33,代码来源:__init__.py


示例18: __create_repo

    def __create_repo(self, repo_name, alias, new_parent_id, clone_uri=False):
        """
        makes repository on filesystem. It's group aware means it'll create
        a repository within a group, and alter the paths accordingly of
        group location

        :param repo_name:
        :param alias:
        :param parent_id:
        :param clone_uri:
        """
        from rhodecode.lib.utils import is_valid_repo, is_valid_repos_group

        if new_parent_id:
            paths = RepoGroup.get(new_parent_id).full_path.split(RepoGroup.url_sep())
            new_parent_path = os.sep.join(paths)
        else:
            new_parent_path = ""

        # we need to make it str for mercurial
        repo_path = os.path.join(*map(lambda x: safe_str(x), [self.repos_path, new_parent_path, repo_name]))

        # check if this path is not a repository
        if is_valid_repo(repo_path, self.repos_path):
            raise Exception("This path %s is a valid repository" % repo_path)

        # check if this path is a group
        if is_valid_repos_group(repo_path, self.repos_path):
            raise Exception("This path %s is a valid group" % repo_path)

        log.info("creating repo %s in %s @ %s" % (repo_name, safe_unicode(repo_path), clone_uri))
        backend = get_backend(alias)

        backend(repo_path, create=True, src_url=clone_uri)
开发者ID:seacoastboy,项目名称:rhodecode,代码行数:34,代码来源:repo.py


示例19: get_paths

    def get_paths(self, repo):
        """
        recursive walk in root dir and return a set of all path in that dir
        based on repository walk function
        """
        index_paths_ = set()
        try:
            cs = self._get_index_changeset(repo)
            for _topnode, _dirs, files in cs.walk('/'):
                for f in files:
                    index_paths_.add(jn(safe_str(repo.path), safe_str(f.path)))

        except RepositoryError:
            log.debug(traceback.format_exc())
            pass
        return index_paths_
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:16,代码来源:daemon.py


示例20: is_valid_repo

def is_valid_repo(repo_name, base_path):
    """
    Returns True if given path is a valid repository False otherwise

    :param repo_name:
    :param base_path:

    :return True: if given path is a valid repository
    """
    full_path = os.path.join(safe_str(base_path), safe_str(repo_name))

    try:
        get_scm(full_path)
        return True
    except VCSError:
        return False
开发者ID:elfixit,项目名称:rhodecode,代码行数:16,代码来源:utils.py



注:本文中的rhodecode.lib.utils2.safe_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils2.safe_unicode函数代码示例发布时间:2022-05-26
下一篇:
Python utils.action_logger函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap