本文整理汇总了Python中six.print_函数的典型用法代码示例。如果您正苦于以下问题:Python print_函数的具体用法?Python print_怎么用?Python print_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了print_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run
def run(self):
cmds = ["jconsole", "localhost:%s" % self.node.jmx_port]
try:
subprocess.call(cmds)
except OSError:
print_("Could not start jconsole. Please make sure jconsole can be found in your $PATH.")
exit(1)
开发者ID:smccarthy788,项目名称:ccm,代码行数:7,代码来源:node_cmds.py
示例2: validate
def validate(self, parser, options, args):
Cmd.validate(self, parser, options, args, load_cluster=True)
try:
self.setting = common.parse_settings(args)
except common.ArgumentError as e:
print_(str(e), file=sys.stderr)
exit(1)
开发者ID:aeriksson,项目名称:ccm,代码行数:7,代码来源:cluster_cmds.py
示例3: print_help
def print_help():
print_("""avaliable command:
help --- show this help
ls --- list friends and groups
talk --- talk to friends
quit --- quit program
""")
开发者ID:HenryHu,项目名称:lwqq,代码行数:7,代码来源:lwqq-cli.py
示例4: build_static
def build_static(self, *args, **options):
"""
Builds the static files directory as well as robots.txt and favicon.ico
"""
logger.debug("Building static directory")
if self.verbosity > 1:
six.print_("Building static directory")
management.call_command("collectstatic", interactive=False, verbosity=0)
target_dir = os.path.join(self.build_dir, settings.STATIC_URL[1:])
if os.path.exists(settings.STATIC_ROOT) and settings.STATIC_URL:
if getattr(settings, "BAKERY_GZIP", False):
self.copytree_and_gzip(settings.STATIC_ROOT, target_dir)
# if gzip isn't enabled, just copy the tree straight over
else:
shutil.copytree(settings.STATIC_ROOT, target_dir)
# If they exist in the static directory, copy the robots.txt
# and favicon.ico files down to the root so they will work
# on the live website.
robot_src = os.path.join(target_dir, "robots.txt")
if os.path.exists(robot_src):
shutil.copy(robot_src, os.path.join(settings.BUILD_DIR, "robots.txt"))
favicon_src = os.path.join(target_dir, "favicon.ico")
if os.path.exists(favicon_src):
shutil.copy(favicon_src, os.path.join(settings.BUILD_DIR, "favicon.ico"))
开发者ID:datadesk,项目名称:django-bakery,代码行数:28,代码来源:build.py
示例5: findA_M0
def findA_M0():
prefix = b"\x00" + b"\xf2" + b"\x00" * (256 - 2 - 16)
c = mysrp.Client()
import time
start = time.time()
num_near_misses = 0
for count in thencount():
if count > 300 and count % 500 == 0:
now = time.time()
print_(count, "tries", now - start)
start = now
if count > 1000000:
raise ValueError("unable to find suitable value in reasonable time")
a_str = prefix + binascii.unhexlify("%032x" % count)
assert len(a_str) == 2048 / 8, (len(a_str), 2048 / 8)
a = mysrp.bytes_to_long(a_str)
A = c.one(a)
# require that the computed M1 has a leading zero
c.two(B, srpSalt, emailUTF8, srpPW)
if c._debug_M1_bytes[0:1] != b"\x00":
continue
print_("found a on count", count)
printdec("private a (normally random)", a)
printhex("private a (hex)", a_str, groups_per_line=2)
return a, A
开发者ID:rfk,项目名称:picl-spec-crypto,代码行数:26,代码来源:picl-crypto.py
示例6: register
def register(self):
"""Registers the workflow type and task types with SWF
It is necessary to do this each time a new task is added to a workflow.
It is safest to run this before each call to :meth:`execute` if you are
just launching a workflow from a cron. However, if you are launching
many workflows and calling :meth:`execute` many times, you may want to
consider calling this method only when necessary because it can
contribute to an SWF API throttling issue.
"""
tasks = get_task_configurations(self.workflow_task)
registerables = []
registerables.append(swf.Domain(name=self.domain))
task_dats = set((t['task_family'], t['task_list'])
for (t_id, t) in iteritems(tasks))
for task_dat in task_dats:
registerables.append(swf.ActivityType(domain=self.domain,
version=self.version,
name=task_dat[0],
task_list=task_dat[1]))
wf_name = self.workflow_task.task_family
wf_task_list = getattr(self.workflow_task, 'swf_task_list', 'default')
registerables.append(swf.WorkflowType(domain=self.domain,
version=self.version,
name=wf_name,
task_list=wf_task_list))
for swf_entity in registerables:
try:
swf_entity.register()
print_(swf_entity.name, 'registered successfully')
except (SWFDomainAlreadyExistsError, SWFTypeAlreadyExistsError):
print_(swf_entity.__class__.__name__, swf_entity.name,
'already exists')
开发者ID:koenvo,项目名称:luigi-swf,代码行数:33,代码来源:executor.py
示例7: print_result
def print_result(self, result):
if self.args.get('format') is None:
stylesheet = self.args.get('stylesheet')
show_conn_info = bool(stylesheet)
elif self.args.get('format') == 'none':
stylesheet = None
show_conn_info = False
elif self.args.get('format') == 'xml':
stylesheet = None
show_conn_info = True
else:
stylesheet = self.args.get('stylesheet')
if not stylesheet:
stylesheet = self.config.get_region_option('vpn-stylesheet')
if stylesheet:
stylesheet = stylesheet.format(format=self.args['format'])
else:
self.log.warn('current region has no stylesheet')
msg = ('current region has no XSLT stylesheet to format '
'output; connection info will not be shown (try '
'specifying one with "--stylesheet" or using '
'"--format xml")')
six.print_(msg, file=sys.stderr)
show_conn_info = bool(stylesheet)
for vpn in result.get('vpnConnectionSet', []):
self.print_vpn_connection(vpn, show_conn_info=show_conn_info,
stylesheet=stylesheet)
开发者ID:gholms,项目名称:euca2ools,代码行数:27,代码来源:describevpnconnections.py
示例8: showStats
def showStats( self ):
'''Shows a few stats on standard output. Shouldn't be called before buildGraph().
'''
if self.numInputSentences > 0:
six.print_( "Character " + self.charLabel + " has a total of " + str( self.numInputWords ) + " words over " + str( self.numInputSentences ) + " sentences for an average of " + str( self.numInputWords / self.numInputSentences ) + " words/sentence.")
else:
six.print_( "Character " + self.charLabel + " has a total of " + str( self.numInputWords ) + " words." )
开发者ID:TheOpenSourceNinja,项目名称:Markov-Comic-Generator,代码行数:7,代码来源:generator.py
示例9: run
def run(self):
try:
common.invalidate_cache()
except Exception as e:
print_(str(e), file=sys.stderr)
print_("Error while deleting cache. Please attempt manually.")
exit(1)
开发者ID:pologood,项目名称:ccm,代码行数:7,代码来源:cluster_cmds.py
示例10: n_mult
def n_mult(a,b):
if isinstance(a,N_polynomial) and isinstance(b,N_polynomial):
total={}
for keya in a.terms_:
for keyb in b.terms_:
m = prodMonomials(keya,keyb)
coeff=a.terms_[keya]*b.terms_[keyb]
if m in total:
newc = coeff + total[m]
if newc==0:
del total[m]
else:
total[m]=newc
else:
total[m]=coeff
return N_polynomial(n_ar=total)
if isinstance(a,N_constantNum) and isinstance(b,N_polynomial):
# print "multiplyBy ", a.a
if(a.num_==1):
return b
out=b.copy()
out.scalarMultiply(a.num_)
return out
if isinstance(a,N_constantNum) and isinstance(b,N_constantNum):
return N_constantNum(a.num_*b.num_)
print_(a, b)
raise NotImplementedError()
开发者ID:btgraham,项目名称:LogSignatureDemo,代码行数:27,代码来源:logsignature.py
示例11: start
def start(self):
""" Start the monitor """
if CURSES_SUPPORTED:
curses.wrapper(self.run)
else:
six.print_("Your system does not have curses installed. "
"Cannot use 'watch'")
开发者ID:alexanderlz,项目名称:dql,代码行数:7,代码来源:monitor.py
示例12: run_cqlsh
def run_cqlsh(self, cmds=None, show_output=False, cqlsh_options=[]):
cdir = self.get_cassandra_dir()
cli = common.join_bin(cdir, 'bin', 'cqlsh')
env = common.make_cassandra_env(cdir, self.get_path())
host = self.network_interfaces['thrift'][0]
port = self.network_interfaces['thrift'][1]
args = cqlsh_options + [ host, str(port) ]
sys.stdout.flush()
if cmds is None:
os.execve(cli, [ common.platform_binary('cqlsh') ] + args, env)
else:
p = subprocess.Popen([ cli ] + args, env=env, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
for cmd in cmds.split(';'):
p.stdin.write(cmd + ';\n')
p.stdin.write("quit;\n")
p.wait()
for err in p.stderr:
print_("(EE) ", err, end='')
if show_output:
i = 0
for log in p.stdout:
# first four lines are not interesting
if i >= 4:
print_(log, end='')
i = i + 1
开发者ID:pydeveloper94,项目名称:ccm,代码行数:25,代码来源:node.py
示例13: __init__
def __init__(self, ncpus="autodetect", interface="0.0.0.0",
broadcast="255.255.255.255", port=None, secret=None,
timeout=None, restart=False, proto=2, socket_timeout=3600, pid_file=None):
pp.Server.__init__(self, ncpus, (), secret, restart,
proto, socket_timeout)
if pid_file:
with open(pid_file, 'w') as pfile:
six.print_(os.getpid(), file=pfile)
atexit.register(os.remove, pid_file)
self.host = interface
self.bcast = broadcast
if port is not None:
self.port = port
else:
self.port = ppc.randomport()
self.timeout = timeout
self.ncon = 0
self.last_con_time = time.time()
self.ncon_lock = threading.Lock()
self.logger.debug("Strarting network server interface=%s port=%i"
% (self.host, self.port))
if self.timeout is not None:
self.logger.debug("ppserver will exit in %i seconds if no "\
"connections with clients exist" % (self.timeout))
ppc.start_thread("timeout_check", self.check_timeout)
开发者ID:bsipocz,项目名称:ppft,代码行数:26,代码来源:ppserver.py
示例14: on_search
def on_search(args):
# TODO: Decode via actual tty encoding
try:
q = args.q[0].decode("utf-8")
except AttributeError:
q = args.q[0]
pkg_names = set()
# First, check for exact case-insensitive name matches
for pkg in session.query(Package).filter(collate(Package.name,"NOCASE")==q).all():
pkg_names.add(pkg.name)
# Check for substring name matches
for pkg in session.query(Package).filter(Package.name.like(u('%{0}%').format(q))).all():
pkg_names.add(pkg.name)
# Check for description matches
for pkg in session.query(Package).filter(Package.description.like(u('%{0}%').format(q))).all():
pkg_names.add(pkg.name)
if len(pkg_names) == 0:
print_(u('No matching packages found.'))
return
# Nice column formatting
max_len_name = max( len(name) for name in pkg_names )
for pkg_name in sorted(pkg_names):
pkg = session.query(Package).get(pkg_name)
print_(u('{name:{max_len_name}} {version:10} {desc}'.format(name=pkg.name, version=pkg.version, desc=pkg.description, max_len_name=max_len_name)))
开发者ID:mostlyuseful,项目名称:pypitools,代码行数:27,代码来源:pypipkglist.py
示例15: _create
def _create(self, name, stack_id, username=None, ssh_keys=None,
user_scripts=None, node_groups=None, connectors=None,
wait=False):
"""
CLI-only; cluster create command
"""
if ssh_keys is None:
ssh_keys = [DEFAULT_SSH_KEY]
try:
return self.create(name, stack_id, username, ssh_keys,
user_scripts, node_groups, connectors, wait)
except error.RequestError as exc:
if not (ssh_keys == [DEFAULT_SSH_KEY] and
'Cannot find requested ssh_keys' in str(exc)):
raise
six.print_('SSH key does not exist; creating...')
# Create the SSH key for the user and then attempt to create the
# cluster again
with open(expand('$HOME/.ssh/id_rsa.pub')) as f:
self._client.credentials.create_ssh_key(
DEFAULT_SSH_KEY,
f.read().strip())
return self.create(name, stack_id, username, ssh_keys,
user_scripts, node_groups, connectors, wait)
开发者ID:highlycaffeinated,项目名称:python-lavaclient,代码行数:28,代码来源:clusters.py
示例16: on_show
def on_show(args):
for pkg_name in args.package_names:
if six.PY2:
pkg_name = pkg_name.decode("utf-8")
pkg = session.query(Package).get(pkg_name)
if pkg:
print_(u('{name}\t{version}\t{desc}'.format(name=pkg.name, version=pkg.version, desc=pkg.description)))
开发者ID:mostlyuseful,项目名称:pypitools,代码行数:7,代码来源:pypipkglist.py
示例17: test_task_with_distutils_dep
def test_task_with_distutils_dep():
_sdist.reset()
@tasks.task
@tasks.needs("paver.tests.test_setuputils.sdist")
def sdist():
assert _sdist.called
env = _set_environment(sdist=sdist)
env.options = options.Bunch(setup=options.Bunch())
install_distutils_tasks()
d = _get_distribution()
d.cmdclass['sdist'] = _sdist
task_obj = env.get_task('sdist')
assert task_obj == sdist
needs_obj = env.get_task(task_obj.needs[0])
assert isinstance(needs_obj, DistutilsTask)
assert needs_obj.command_class == _sdist
tasks._process_commands(['sdist', "-f"])
assert sdist.called
assert _sdist.called
cmd = d.get_command_obj('sdist')
print_("Cmd is: %s" % cmd)
assert cmd.foo
assert _sdist.foo_set
开发者ID:jaraco,项目名称:paver,代码行数:27,代码来源:test_setuputils.py
示例18: output_profile_result
def output_profile_result(env):
stdout_trap = six.StringIO()
env.profile_deco.print_stats(stdout_trap)
profile_output = stdout_trap.getvalue()
profile_output = profile_output.rstrip()
six.print_(profile_output)
env.event_bus.publish_event(Event(EVENT.ON_LINE_PROFILER_RESULT, result=profile_output))
开发者ID:vt100,项目名称:rqalpha,代码行数:7,代码来源:main.py
示例19: get_bars
def get_bars(self, order_book_id, fields=None):
try:
s, e = self._index[order_book_id]
except KeyError:
six.print_(_(u"No data for {}").format(order_book_id))
return
if fields is None:
# the first is date
fields = self._table.names[1:]
if len(fields) == 1:
return self._converter.convert(fields[0], self._table.cols[fields[0]][s:e])
# remove datetime if exist in fields
self._remove_(fields, 'datetime')
dtype = np.dtype([('datetime', np.uint64)] +
[(f, self._converter.field_type(f, self._table.cols[f].dtype))
for f in fields])
result = np.empty(shape=(e - s, ), dtype=dtype)
for f in fields:
result[f] = self._converter.convert(f, self._table.cols[f][s:e])
result['datetime'] = self._table.cols['date'][s:e]
result['datetime'] *= 1000000
return result
开发者ID:8dspaces,项目名称:rqalpha,代码行数:27,代码来源:daybar_store.py
示例20: __init__
def __init__(self, username,
password,
proxy=None,
parser=DEFAULT_PARSER,
ignorefiles=None,
max_path_part_len=None,
gzip_courses=False,
wk_filter=None):
self.username = username
self.password = password
self.parser = parser
# Split "ignorefiles" argument on commas, strip, remove prefixing dot
# if there is one, and filter out empty tokens.
self.ignorefiles = [x.strip()[1:] if x[0] == '.' else x.strip()
for x in ignorefiles.split(',') if len(x)]
self.session = None
self.proxy = proxy
self.max_path_part_len = max_path_part_len
self.gzip_courses = gzip_courses
try:
self.wk_filter = map(
int, wk_filter.split(",")) if wk_filter else None
except Exception as e:
print_("Invalid week filter, should be a comma separated list of integers", e)
exit()
开发者ID:hsingh23,项目名称:coursera-dl,代码行数:29,代码来源:courseradownloader.py
注:本文中的six.print_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论