本文整理汇总了Python中rhodecode.lib.utils2.safe_str函数的典型用法代码示例。如果您正苦于以下问题:Python safe_str函数的具体用法?Python safe_str怎么用?Python safe_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了safe_str函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: differ
def differ(org_repo, org_ref, other_repo, other_ref,
context=3, ignore_whitespace=False):
"""
General differ between branches, bookmarks, revisions of two remote or
local but related repositories
:param org_repo:
:param org_ref:
:param other_repo:
:type other_repo:
:type other_ref:
"""
org_repo_scm = org_repo.scm_instance
other_repo_scm = other_repo.scm_instance
org_repo = org_repo_scm._repo
other_repo = other_repo_scm._repo
org_ref = safe_str(org_ref[1])
other_ref = safe_str(other_ref[1])
if org_repo_scm == other_repo_scm:
log.debug('running diff between %[email protected]%s and %[email protected]%s'
% (org_repo.path, org_ref,
other_repo.path, other_ref))
_diff = org_repo_scm.get_diff(rev1=org_ref, rev2=other_ref,
ignore_whitespace=ignore_whitespace, context=context)
return _diff
return '' # FIXME: when is it ever relevant to return nothing?
开发者ID:jeffjirsa,项目名称:rhodecode,代码行数:31,代码来源:diffs.py
示例2: is_valid_repos_group
def is_valid_repos_group(repos_group_name, base_path):
"""
Returns True if given path is a repos group False otherwise
:param repo_name:
:param base_path:
"""
full_path = os.path.join(safe_str(base_path), safe_str(repos_group_name))
# check if it's not a repo
if is_valid_repo(repos_group_name, base_path):
return False
try:
# we need to check bare git repos at higher level
# since we might match branches/hooks/info/objects or possible
# other things inside bare git repo
get_scm(os.path.dirname(full_path))
return False
except VCSError:
pass
# check if it's a valid path
if os.path.isdir(full_path):
return True
return False
开发者ID:break123,项目名称:rhodecode,代码行数:27,代码来源:utils.py
示例3: commit_change
def commit_change(self, repo, repo_name, cs, user, author, message,
content, f_path):
"""
Commits changes
:param repo: SCM instance
"""
user = self._get_user(user)
IMC = self._get_IMC_module(repo.alias)
# decoding here will force that we have proper encoded values
# in any other case this will throw exceptions and deny commit
content = safe_str(content)
path = safe_str(f_path)
# message and author needs to be unicode
# proper backend should then translate that into required type
message = safe_unicode(message)
author = safe_unicode(author)
imc = IMC(repo)
imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
try:
tip = imc.commit(message=message, author=author,
parents=[cs], branch=cs.branch)
except Exception, e:
log.error(traceback.format_exc())
raise IMCCommitError(str(e))
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:27,代码来源:scm.py
示例4: commit_change
def commit_change(self, repo, repo_name, cs, user, author, message,
content, f_path):
"""
Commits changes
:param repo: SCM instance
"""
user = self._get_user(user)
IMC = self._get_IMC_module(repo.alias)
# decoding here will force that we have proper encoded values
# in any other case this will throw exceptions and deny commit
content = safe_str(content)
path = safe_str(f_path)
# message and author needs to be unicode
# proper backend should then translate that into required type
message = safe_unicode(message)
author = safe_unicode(author)
imc = IMC(repo)
imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
tip = imc.commit(message=message,
author=author,
parents=[cs], branch=cs.branch)
self.mark_for_invalidation(repo_name)
self._handle_push(repo,
username=user.username,
action='push_local',
repo_name=repo_name,
revisions=[tip.raw_id])
return tip
开发者ID:greenboxindonesia,项目名称:rhodecode,代码行数:32,代码来源:scm.py
示例5: commit_change
def commit_change(self, repo, repo_name, cs, user, author, message,
content, f_path):
"""
Commits changes
:param repo: SCM instance
"""
if repo.alias == 'hg':
from rhodecode.lib.vcs.backends.hg import \
MercurialInMemoryChangeset as IMC
elif repo.alias == 'git':
from rhodecode.lib.vcs.backends.git import \
GitInMemoryChangeset as IMC
# decoding here will force that we have proper encoded values
# in any other case this will throw exceptions and deny commit
content = safe_str(content)
path = safe_str(f_path)
# message and author needs to be unicode
# proper backend should then translate that into required type
message = safe_unicode(message)
author = safe_unicode(author)
m = IMC(repo)
m.change(FileNode(path, content))
tip = m.commit(message=message,
author=author,
parents=[cs], branch=cs.branch)
action = 'push_local:%s' % tip.raw_id
action_logger(user, action, repo_name)
self.mark_for_invalidation(repo_name)
return tip
开发者ID:yujiro,项目名称:rhodecode,代码行数:34,代码来源:scm.py
示例6: __init__
def __init__(self, server, base_dn, port=389, bind_dn='', bind_pass='',
tls_kind='PLAIN', tls_reqcert='DEMAND', ldap_version=3,
ldap_filter='(&(objectClass=user)(!(objectClass=computer)))',
search_scope='SUBTREE', attr_login='uid'):
self.ldap_version = ldap_version
ldap_server_type = 'ldap'
self.TLS_KIND = tls_kind
if self.TLS_KIND == 'LDAPS':
port = port or 689
ldap_server_type = ldap_server_type + 's'
OPT_X_TLS_DEMAND = 2
self.TLS_REQCERT = getattr(ldap, 'OPT_X_TLS_%s' % tls_reqcert,
OPT_X_TLS_DEMAND)
self.LDAP_SERVER_ADDRESS = server
self.LDAP_SERVER_PORT = port
# USE FOR READ ONLY BIND TO LDAP SERVER
self.LDAP_BIND_DN = safe_str(bind_dn)
self.LDAP_BIND_PASS = safe_str(bind_pass)
self.LDAP_SERVER = "%s://%s:%s" % (ldap_server_type,
self.LDAP_SERVER_ADDRESS,
self.LDAP_SERVER_PORT)
self.BASE_DN = safe_str(base_dn)
self.LDAP_FILTER = safe_str(ldap_filter)
self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope)
self.attr_login = attr_login
开发者ID:yujiro,项目名称:rhodecode,代码行数:31,代码来源:auth_ldap.py
示例7: get_file_history
def get_file_history(self, path, limit=None):
"""
Returns history of file as reversed list of ``Changeset`` objects for
which file at given ``path`` has been modified.
TODO: This function now uses os underlying 'git' and 'grep' commands
which is generally not good. Should be replaced with algorithm
iterating commits.
"""
self._get_filectx(path)
cs_id = safe_str(self.id)
f_path = safe_str(path)
if limit:
cmd = 'log -n %s --pretty="format: %%H" -s -p %s -- "%s"' % (
safe_int(limit, 0), cs_id, f_path
)
else:
cmd = 'log --pretty="format: %%H" -s -p %s -- "%s"' % (
cs_id, f_path
)
so, se = self.repository.run_git_command(cmd)
ids = re.findall(r'[0-9a-fA-F]{40}', so)
return [self.repository.get_changeset(id) for id in ids]
开发者ID:jeffjirsa,项目名称:rhodecode,代码行数:25,代码来源:changeset.py
示例8: command
def command(self):
# get SqlAlchemy session
self._init_session()
repos_location = RhodeCodeUi.get_repos_location()
to_remove = []
for dn, dirs, f in os.walk(safe_str(repos_location)):
alldirs = list(dirs)
del dirs[:]
if ".hg" in alldirs or "objects" in alldirs and ("refs" in alldirs or "packed-refs" in f):
continue
for loc in alldirs:
if REMOVED_REPO_PAT.match(loc):
to_remove.append([os.path.join(dn, loc), self._extract_date(loc)])
else:
dirs.append(loc)
# filter older than (if present)!
now = datetime.datetime.now()
older_than = self.options.older_than
if older_than:
to_remove_filtered = []
older_than_date = self._parse_older_than(older_than)
for name, date_ in to_remove:
repo_age = now - date_
if repo_age > older_than_date:
to_remove_filtered.append([name, date_])
to_remove = to_remove_filtered
print >> sys.stdout, "removing %s deleted repos older than %s (%s)" % (
len(to_remove),
older_than,
older_than_date,
)
else:
print >> sys.stdout, "removing all [%s] deleted repos" % len(to_remove)
if self.options.dont_ask or not to_remove:
# don't ask just remove !
remove = True
else:
remove = ask_ok(
"the following repositories will be deleted completely:\n%s\n"
"are you sure you want to remove them [y/n]?"
% ", \n".join(["%s removed on %s" % (safe_str(x[0]), safe_str(x[1])) for x in to_remove])
)
if remove:
for path, date_ in to_remove:
print >> sys.stdout, "removing repository %s" % path
shutil.rmtree(path)
else:
print "nothing done exiting..."
sys.exit(0)
开发者ID:jeffjirsa,项目名称:rhodecode,代码行数:53,代码来源:cleanup.py
示例9: get_node
def get_node(self, repo, path):
"""
gets a filenode based on given full path.It operates on string for
hg git compatability.
:param repo: scm repo instance
:param path: full path including root location
:return: FileNode
"""
root_path = safe_str(repo.path)+'/'
parts = safe_str(path).partition(root_path)
cs = self._get_index_changeset(repo)
node = cs.get_node(parts[-1])
return node
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:14,代码来源:daemon.py
示例10: command
def command(self):
logging.config.fileConfig(self.path_to_ini_file)
from pylons import config
#get to remove repos !!
add_cache(config)
engine = engine_from_config(config, 'sqlalchemy.db1.')
init_model(engine)
repos_location = RhodeCodeUi.get_repos_location()
to_remove = []
for dn, dirs, f in os.walk(safe_str(repos_location)):
for loc in dirs:
if REMOVED_REPO_PAT.match(loc):
to_remove.append([loc, self._extract_date(loc)])
#filter older than (if present)!
now = datetime.datetime.now()
older_than = self.options.older_than
if older_than:
to_remove_filtered = []
older_than_date = self._parse_older_than(older_than)
for name, date_ in to_remove:
repo_age = now - date_
if repo_age > older_than_date:
to_remove_filtered.append([name, date_])
to_remove = to_remove_filtered
print >> sys.stdout, 'removing [%s] deleted repos older than %s[%s]' \
% (len(to_remove), older_than, older_than_date)
else:
print >> sys.stdout, 'removing all [%s] deleted repos' \
% len(to_remove)
if self.options.dont_ask or not to_remove:
# don't ask just remove !
remove = True
else:
remove = ask_ok('are you sure to remove listed repos \n%s [y/n]?'
% ', \n'.join(['%s removed on %s'
% (safe_str(x[0]), safe_str(x[1])) for x in to_remove]))
if remove:
for name, date_ in to_remove:
print >> sys.stdout, 'removing repository %s' % name
shutil.rmtree(os.path.join(repos_location, name))
else:
print 'nothing done exiting...'
sys.exit(0)
开发者ID:break123,项目名称:rhodecode,代码行数:48,代码来源:cleanup.py
示例11: _get_scm_size
def _get_scm_size(alias, root_path):
if not alias.startswith('.'):
alias += '.'
size_scm, size_root = 0, 0
for path, dirs, files in os.walk(safe_str(root_path)):
if path.find(alias) != -1:
for f in files:
try:
size_scm += os.path.getsize(os.path.join(path, f))
except OSError:
pass
else:
for f in files:
try:
size_root += os.path.getsize(os.path.join(path, f))
except OSError:
pass
size_scm_f = h.format_byte_size(size_scm)
size_root_f = h.format_byte_size(size_root)
size_total_f = h.format_byte_size(size_root + size_scm)
return size_scm_f, size_root_f, size_total_f
开发者ID:greenboxindonesia,项目名称:rhodecode,代码行数:25,代码来源:hooks.py
示例12: gravatar_url
def gravatar_url(email_address, size=30, ssl_enabled=True):
from pylons import url # doh, we need to re-import url to mock it later
from rhodecode import CONFIG
_def = '[email protected]' # default gravatar
use_gravatar = str2bool(CONFIG.get('use_gravatar'))
alternative_gravatar_url = CONFIG.get('alternative_gravatar_url', '')
email_address = email_address or _def
if not use_gravatar or not email_address or email_address == _def:
f = lambda a, l: min(l, key=lambda x: abs(x - a))
return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
if use_gravatar and alternative_gravatar_url:
tmpl = alternative_gravatar_url
parsed_url = urlparse.urlparse(url.current(qualified=True))
tmpl = tmpl.replace('{email}', email_address)\
.replace('{md5email}', hashlib.md5(email_address.lower()).hexdigest()) \
.replace('{netloc}', parsed_url.netloc)\
.replace('{scheme}', parsed_url.scheme)\
.replace('{size}', str(size))
return tmpl
default = 'identicon'
baseurl_nossl = "http://www.gravatar.com/avatar/"
baseurl_ssl = "https://secure.gravatar.com/avatar/"
baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
if isinstance(email_address, unicode):
#hashlib crashes on unicode items
email_address = safe_str(email_address)
# construct the url
gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
return gravatar_url
开发者ID:greenboxindonesia,项目名称:rhodecode,代码行数:35,代码来源:helpers.py
示例13: _index
def _index(self, revision, method):
c.anchor_url = anchor_url
c.ignorews_url = _ignorews_url
c.context_url = _context_url
c.fulldiff = fulldiff = request.GET.get('fulldiff')
#get ranges of revisions if preset
rev_range = revision.split('...')[:2]
enable_comments = True
try:
if len(rev_range) == 2:
enable_comments = False
rev_start = rev_range[0]
rev_end = rev_range[1]
rev_ranges = c.rhodecode_repo.get_changesets(start=rev_start,
end=rev_end)
else:
rev_ranges = [c.rhodecode_repo.get_changeset(revision)]
c.cs_ranges = list(rev_ranges)
if not c.cs_ranges:
raise RepositoryError('Changeset range returned empty result')
except (RepositoryError, ChangesetDoesNotExistError, Exception), e:
log.error(traceback.format_exc())
h.flash(safe_str(e), category='error')
raise HTTPNotFound()
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:26,代码来源:changeset.py
示例14: __get_instance
def __get_instance(self):
repo_full_path = self.repo_full_path
try:
alias = get_scm(repo_full_path)[0]
log.debug("Creating instance of %s repository" % alias)
backend = get_backend(alias)
except VCSError:
log.error(traceback.format_exc())
log.error(
"Perhaps this repository is in db and not in "
"filesystem run rescan repositories with "
'"destroy old data " option from admin panel'
)
return
if alias == "hg":
repo = backend(safe_str(repo_full_path), create=False, baseui=self._ui)
# skip hidden web repository
if repo._get_hidden():
return
else:
repo = backend(repo_full_path, create=False)
return repo
开发者ID:break123,项目名称:rhodecode,代码行数:27,代码来源:db_1_2_0.py
示例15: _get_cache_parameters
def _get_cache_parameters(query):
"""For a query with cache_region and cache_namespace configured,
return the correspoinding Cache instance and cache key, based
on this query's current criterion and parameter values.
"""
if not hasattr(query, '_cache_parameters'):
raise ValueError("This Query does not have caching "
"parameters configured.")
region, namespace, cache_key = query._cache_parameters
namespace = _namespace_from_query(namespace, query)
if cache_key is None:
# cache key - the value arguments from this query's parameters.
args = [safe_str(x) for x in _params_from_query(query)]
args.extend(filter(lambda k: k not in ['None', None, u'None'],
[str(query._limit), str(query._offset)]))
cache_key = " ".join(args)
if cache_key is None:
raise Exception('Cache key cannot be None')
# get cache
#cache = query.cache_manager.get_cache_region(namespace, region)
cache = get_cache_region(namespace, region)
# optional - hash the cache_key too for consistent length
# import uuid
# cache_key= str(uuid.uuid5(uuid.NAMESPACE_DNS, cache_key))
return cache, cache_key
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:33,代码来源:caching_query.py
示例16: gravatar_url
def gravatar_url(email_address, size=30):
from pylons import url ## doh, we need to re-import url to mock it later
if(str2bool(config['app_conf'].get('use_gravatar')) and
config['app_conf'].get('alternative_gravatar_url')):
tmpl = config['app_conf'].get('alternative_gravatar_url', '')
parsed_url = urlparse.urlparse(url.current(qualified=True))
tmpl = tmpl.replace('{email}', email_address)\
.replace('{md5email}', hashlib.md5(email_address.lower()).hexdigest()) \
.replace('{netloc}', parsed_url.netloc)\
.replace('{scheme}', parsed_url.scheme)\
.replace('{size}', str(size))
return tmpl
if (not str2bool(config['app_conf'].get('use_gravatar')) or
not email_address or email_address == '[email protected]'):
f = lambda a, l: min(l, key=lambda x: abs(x - a))
return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
default = 'identicon'
baseurl_nossl = "http://www.gravatar.com/avatar/"
baseurl_ssl = "https://secure.gravatar.com/avatar/"
baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
if isinstance(email_address, unicode):
#hashlib crashes on unicode items
email_address = safe_str(email_address)
# construct the url
gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
return gravatar_url
开发者ID:yujiro,项目名称:rhodecode,代码行数:32,代码来源:helpers.py
示例17: safe_str
def safe_str(unicode_, to_encoding=None):
"""
safe str function. Does few trick to turn unicode_ into string
In case of UnicodeEncodeError we try to return it with encoding detected
by chardet library if it fails fallback to string with errors replaced
:param unicode_: unicode to encode
:rtype: str
:returns: str object
"""
from rhodecode.lib.utils2 import safe_str
return safe_str(unicode_, to_encoding)
if isinstance(unicode_, str):
return unicode_
try:
return unicode_.encode(to_encoding)
except UnicodeEncodeError:
pass
try:
import chardet
encoding = chardet.detect(unicode_)['encoding']
if encoding is None:
raise UnicodeEncodeError()
return unicode_.encode(encoding)
except (ImportError, UnicodeEncodeError):
return unicode_.encode(to_encoding, 'replace')
return safe_str
开发者ID:break123,项目名称:rhodecode,代码行数:33,代码来源:__init__.py
示例18: __create_repo
def __create_repo(self, repo_name, alias, new_parent_id, clone_uri=False):
"""
makes repository on filesystem. It's group aware means it'll create
a repository within a group, and alter the paths accordingly of
group location
:param repo_name:
:param alias:
:param parent_id:
:param clone_uri:
"""
from rhodecode.lib.utils import is_valid_repo, is_valid_repos_group
if new_parent_id:
paths = RepoGroup.get(new_parent_id).full_path.split(RepoGroup.url_sep())
new_parent_path = os.sep.join(paths)
else:
new_parent_path = ""
# we need to make it str for mercurial
repo_path = os.path.join(*map(lambda x: safe_str(x), [self.repos_path, new_parent_path, repo_name]))
# check if this path is not a repository
if is_valid_repo(repo_path, self.repos_path):
raise Exception("This path %s is a valid repository" % repo_path)
# check if this path is a group
if is_valid_repos_group(repo_path, self.repos_path):
raise Exception("This path %s is a valid group" % repo_path)
log.info("creating repo %s in %s @ %s" % (repo_name, safe_unicode(repo_path), clone_uri))
backend = get_backend(alias)
backend(repo_path, create=True, src_url=clone_uri)
开发者ID:seacoastboy,项目名称:rhodecode,代码行数:34,代码来源:repo.py
示例19: get_paths
def get_paths(self, repo):
"""
recursive walk in root dir and return a set of all path in that dir
based on repository walk function
"""
index_paths_ = set()
try:
cs = self._get_index_changeset(repo)
for _topnode, _dirs, files in cs.walk('/'):
for f in files:
index_paths_.add(jn(safe_str(repo.path), safe_str(f.path)))
except RepositoryError:
log.debug(traceback.format_exc())
pass
return index_paths_
开发者ID:adamscieszko,项目名称:rhodecode,代码行数:16,代码来源:daemon.py
示例20: is_valid_repo
def is_valid_repo(repo_name, base_path):
"""
Returns True if given path is a valid repository False otherwise
:param repo_name:
:param base_path:
:return True: if given path is a valid repository
"""
full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
try:
get_scm(full_path)
return True
except VCSError:
return False
开发者ID:elfixit,项目名称:rhodecode,代码行数:16,代码来源:utils.py
注:本文中的rhodecode.lib.utils2.safe_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论