• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.shell函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中utils.shell函数的典型用法代码示例。如果您正苦于以下问题:Python shell函数的具体用法?Python shell怎么用?Python shell使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了shell函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: install_cert

def install_cert(domain, ssl_cert, ssl_chain, env):
	# Write the combined cert+chain to a temporary path and validate that it is OK.
	# The certificate always goes above the chain.
	import tempfile
	fd, fn = tempfile.mkstemp('.pem')
	os.write(fd, (ssl_cert + '\n' + ssl_chain).encode("ascii"))
	os.close(fd)

	# Do validation on the certificate before installing it.
	ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
	cert_status, cert_status_details = check_certificate(domain, fn, ssl_private_key)
	if cert_status != "OK":
		if cert_status == "SELF-SIGNED":
			cert_status = "This is a self-signed certificate. I can't install that."
		os.unlink(fn)
		if cert_status_details is not None:
			cert_status += " " + cert_status_details
		return cert_status

	# Where to put it?
	# Make a unique path for the certificate.
	from cryptography.hazmat.primitives import hashes
	from binascii import hexlify
	cert = load_pem(load_cert_chain(fn)[0])
	all_domains, cn = get_certificate_domains(cert)
	path = "%s-%s-%s.pem" % (
		safe_domain_name(cn), # common name, which should be filename safe because it is IDNA-encoded, but in case of a malformed cert make sure it's ok to use as a filename
		cert.not_valid_after.date().isoformat().replace("-", ""), # expiration date
		hexlify(cert.fingerprint(hashes.SHA256())).decode("ascii")[0:8], # fingerprint prefix
		)
	ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', path))

	# Install the certificate.
	os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
	shutil.move(fn, ssl_certificate)

	ret = ["OK"]

	# When updating the cert for PRIMARY_HOSTNAME, symlink it from the system
	# certificate path, which is hard-coded for various purposes, and then
	# restart postfix and dovecot.
	if domain == env['PRIMARY_HOSTNAME']:
		# Update symlink.
		system_ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
		os.unlink(system_ssl_certificate)
		os.symlink(ssl_certificate, system_ssl_certificate)

		# Restart postfix and dovecot so they pick up the new file.
		shell('check_call', ["/usr/sbin/service", "postfix", "restart"])
		shell('check_call', ["/usr/sbin/service", "dovecot", "restart"])
		ret.append("mail services restarted")

		# The DANE TLSA record will remain valid so long as the private key
		# hasn't changed. We don't ever change the private key automatically.
		# If the user does it, they must manually update DNS.

	# Update the web configuration so nginx picks up the new certificate file.
	from web_update import do_web_update
	ret.append( do_web_update(env) )
	return "\n".join(ret)
开发者ID:ContainerizeUS,项目名称:mailinabox,代码行数:60,代码来源:ssl_certificates.py


示例2: list_apt_updates

def list_apt_updates(apt_update=True):
	# See if we have this information cached recently.
	# Keep the information for 8 hours.
	global _apt_updates
	if _apt_updates is not None and _apt_updates[0] > datetime.datetime.now() - datetime.timedelta(hours=8):
		return _apt_updates[1]

	# Run apt-get update to refresh package list. This should be running daily
	# anyway, so on the status checks page don't do this because it is slow.
	if apt_update:
		shell("check_call", ["/usr/bin/apt-get", "-qq", "update"])

	# Run apt-get upgrade in simulate mode to get a list of what
	# it would do.
	simulated_install = shell("check_output", ["/usr/bin/apt-get", "-qq", "-s", "upgrade"])
	pkgs = []
	for line in simulated_install.split('\n'):
		if line.strip() == "":
			continue
		if re.match(r'^Conf .*', line):
			 # remove these lines, not informative
			continue
		m = re.match(r'^Inst (.*) \[(.*)\] \((\S*)', line)
		if m:
			pkgs.append({ "package": m.group(1), "version": m.group(3), "current_version": m.group(2) })
		else:
			pkgs.append({ "package": "[" + line + "]", "version": "", "current_version": "" })

	# Cache for future requests.
	_apt_updates = (datetime.datetime.now(), pkgs)

	return pkgs
开发者ID:baltoche,项目名称:mailinabox,代码行数:32,代码来源:status_checks.py


示例3: main

def main():
    _mkdirs(SRCDIR, INSTALLDIR)
    setup_logging()
    fetch_and_build()
    for db in ('sqlite3', 'mysql'):
        shell('rm -rf {}/*'.format(INSTALLDIR))
        setup_and_test(db)
开发者ID:EagleSmith,项目名称:seafile,代码行数:7,代码来源:run.py


示例4: add_mail_user

def add_mail_user(email, pw, privs, env):
	# accept IDNA domain names but normalize to Unicode before going into database
	email = sanitize_idn_email_address(email)

	# validate email
	if email.strip() == "":
		return ("No email address provided.", 400)
	elif not validate_email(email):
		return ("Invalid email address.", 400)
	elif not validate_email(email, mode='user'):
		return ("User account email addresses may only use the ASCII letters A-Z, the digits 0-9, underscore (_), hyphen (-), and period (.).", 400)

	validate_password(pw)

	# validate privileges
	if privs is None or privs.strip() == "":
		privs = []
	else:
		privs = privs.split("\n")
		for p in privs:
			validation = validate_privilege(p)
			if validation: return validation

	# get the database
	conn, c = open_database(env, with_connection=True)

	# hash the password
	pw = hash_password(pw)

	# add the user to the database
	try:
		c.execute("INSERT INTO users (email, password, privileges) VALUES (?, ?, ?)",
			(email, pw, "\n".join(privs)))
	except sqlite3.IntegrityError:
		return ("User already exists.", 400)

	# write databasebefore next step
	conn.commit()

	# Create & subscribe the user's INBOX, Trash, Spam, and Drafts folders.
	# * Our sieve rule for spam expects that the Spam folder exists.
	# * Roundcube will show an error if the user tries to delete a message before the Trash folder exists (#359).
	# * K-9 mail will poll every 90 seconds if a Drafts folder does not exist, so create it
	#   to avoid unnecessary polling.

	# Check if the mailboxes exist before creating them. When creating a user that had previously
	# been deleted, the mailboxes will still exist because they are still on disk.
	try:
		existing_mboxes = utils.shell('check_output', ["doveadm", "mailbox", "list", "-u", email, "-8"], capture_stderr=True).split("\n")
	except subprocess.CalledProcessError as e:
		c.execute("DELETE FROM users WHERE email=?", (email,))
		conn.commit()
		return ("Failed to initialize the user: " + e.output.decode("utf8"), 400)

	for folder in ("INBOX", "Trash", "Spam", "Drafts"):
		if folder not in existing_mboxes:
			utils.shell('check_call', ["doveadm", "mailbox", "create", "-u", email, "-s", folder])

	# Update things in case any new domains are added.
	return kick(env, "mail user added")
开发者ID:benschumacher,项目名称:mailinabox,代码行数:60,代码来源:mailconfig.py


示例5: run

def run (args):
  """Removes all Mygrate data (cancels mygrate init)."""
  cmds.init.require_init()
  path = repo.repopath() + '/.mygrate'
  print "Removing everything under %s." % path
  utils.shell ('rm -rf %s' % path)
  print "Mygrate repo successfully removed. Use mygrate init to reinitialize it."
开发者ID:mustangostang,项目名称:mygrate,代码行数:7,代码来源:clean.py


示例6: ensure_ssl_certificate_exists

def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, csr_path, env):
	# For domains besides PRIMARY_HOSTNAME, generate a self-signed certificate if one doesn't
	# already exist. See setup/mail.sh for documentation.

	if domain == env['PRIMARY_HOSTNAME']:
		return

	if os.path.exists(ssl_certificate):
		return

	os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)

	# Generate a new self-signed certificate using the same private key that we already have.

	# Start with a CSR.
	shell("check_call", [
		"openssl", "req", "-new",
		"-key", ssl_key,
		"-out",  csr_path,
		"-subj", "/C=%s/ST=/L=/O=/CN=%s" % (env["CSR_COUNTRY"], domain)])

	# And then make the certificate.
	shell("check_call", [
		"openssl", "x509", "-req",
		"-days", "365",
		"-in", csr_path,
		"-signkey", ssl_key,
		"-out", ssl_certificate])
开发者ID:zentra,项目名称:mailinabox,代码行数:28,代码来源:web_update.py


示例7: check_imap_login

	def check_imap_login(self, email, pw, env):
		# Validate a user's credentials.

		# Sanity check.
		if email == "" or pw == "":
			return "Enter an email address and password."

		# Authenticate.
		try:
			# Use doveadm to check credentials. doveadm will return
			# a non-zero exit status if the credentials are no good,
			# and check_call will raise an exception in that case.
			utils.shell('check_call', [
				"/usr/bin/doveadm",
				"auth", "test",
				email, pw
				])
		except:
			# Login failed.
			return "Invalid email address or password."

		# Authorize.
		# (This call should never fail on a valid user.)
		privs = get_mail_user_privileges(email, env)
		if isinstance(privs, tuple): raise Exception("Error getting privileges.")
		if "admin" not in privs:
			return "You are not an administrator for this system."

		return "OK"
开发者ID:Jheguy2,项目名称:mailinabox,代码行数:29,代码来源:auth.py


示例8: ensure_ssl_certificate_exists

def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, csr_path, env):
	# For domains besides PRIMARY_HOSTNAME, generate a self-signed certificate if
	# a certificate doesn't already exist. See setup/mail.sh for documentation.

	if domain == env['PRIMARY_HOSTNAME']:
		return

	# Sanity check. Shouldn't happen. A non-primary domain might use this
	# certificate (see above), but then the certificate should exist anyway.
	if ssl_certificate == os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_certificate.pem'):
		return

	if os.path.exists(ssl_certificate):
		return

	os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)

	# Generate a new self-signed certificate using the same private key that we already have.

	# Start with a CSR.
	shell("check_call", [
		"openssl", "req", "-new",
		"-key", ssl_key,
		"-out",  csr_path,
		"-sha256",
		"-subj", "/C=%s/ST=/L=/O=/CN=%s" % (env["CSR_COUNTRY"], domain)])

	# And then make the certificate.
	shell("check_call", [
		"openssl", "x509", "-req",
		"-days", "365",
		"-in", csr_path,
		"-signkey", ssl_key,
		"-out", ssl_certificate])
开发者ID:alchen99,项目名称:mailinabox,代码行数:34,代码来源:web_update.py


示例9: ensure_ssl_certificate_exists

def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, env):
	# For domains besides PRIMARY_HOSTNAME, generate a self-signed certificate if
	# a certificate doesn't already exist. See setup/mail.sh for documentation.

	if domain == env['PRIMARY_HOSTNAME']:
		return

	# Sanity check. Shouldn't happen. A non-primary domain might use this
	# certificate (see above), but then the certificate should exist anyway.
	if ssl_certificate == os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_certificate.pem'):
		return

	if os.path.exists(ssl_certificate):
		return

	os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)

	# Generate a new self-signed certificate using the same private key that we already have.

	# Start with a CSR written to a temporary file.
	with tempfile.NamedTemporaryFile(mode="w") as csr_fp:
		csr_fp.write(create_csr(domain, ssl_key, env))
		csr_fp.flush() # since we won't close until after running 'openssl x509', since close triggers delete.

		# And then make the certificate.
		shell("check_call", [
			"openssl", "x509", "-req",
			"-days", "365",
			"-in", csr_fp.name,
			"-signkey", ssl_key,
			"-out", ssl_certificate])
开发者ID:mboersma,项目名称:mailinabox,代码行数:31,代码来源:web_update.py


示例10: do_web_update

def do_web_update(env):
	# Build an nginx configuration file.
	nginx_conf = open(os.path.join(os.path.dirname(__file__), "../conf/nginx-top.conf")).read()

	# Add configuration for each web domain.
	template1 = open(os.path.join(os.path.dirname(__file__), "../conf/nginx.conf")).read()
	template2 = open(os.path.join(os.path.dirname(__file__), "../conf/nginx-primaryonly.conf")).read()
	for domain in get_web_domains(env):
		nginx_conf += make_domain_config(domain, template1, template2, env)

	# Did the file change? If not, don't bother writing & restarting nginx.
	nginx_conf_fn = "/etc/nginx/conf.d/local.conf"
	if os.path.exists(nginx_conf_fn):
		with open(nginx_conf_fn) as f:
			if f.read() == nginx_conf:
				return ""

	# Save the file.
	with open(nginx_conf_fn, "w") as f:
		f.write(nginx_conf)

	# Kick nginx. Since this might be called from the web admin
	# don't do a 'restart'. That would kill the connection before
	# the API returns its response. A 'reload' should be good
	# enough and doesn't break any open connections.
	shell('check_call', ["/usr/sbin/service", "nginx", "reload"])

	return "web updated\n"
开发者ID:alchen99,项目名称:mailinabox,代码行数:28,代码来源:web_update.py


示例11: transformBDTVariables

def transformBDTVariables(infn, outfn, varmap):
    #
    # apply a functional transformation f(x) to all 
    # nodes with cuts '< X' such that the node definition becomes '< f(X)'
    #
    # varmap is a map of variable to valid TFormula expression, eg. { 'x' : 'pow(x,2)/1.0e3' }
    #
    outxml = outfn.replace('.gz', '')
    rootnode = xmlparser.parse(xmlreader(infn).handle())
    varlist = []
    for v in rootnode.xpath('//Variable'):
        k = v.attrib['Title']
        if k in varmap:
            v.attrib['Min'] = str(eval(varmap[k].replace(k,v.attrib['Min'])))
            v.attrib['Max'] = str(eval(varmap[k].replace(k,v.attrib['Max'])))
        varlist += [k]
    for n in rootnode.xpath('//Node'):
        cutvar = varlist[int(n.attrib['IVar'])]
        if cutvar in varmap:
            n.attrib['Cut'] = str(eval(varmap[cutvar].replace(cutvar,n.attrib['Cut'])))
    hndl = open(outxml,'w')
    hndl.write('<?xml version="1.0"?>\n')
    rootnode.write(hndl)
    hndl.write('\n')
    hndl.close()
    if outfn.endswith('.gz'):
        shell('gzip %s'%(outxml))
开发者ID:dtmori,项目名称:HWW,代码行数:27,代码来源:tmvatools.py


示例12: swapBDTVariables

def swapBDTVariables(infn, outfn, varmap, resetRange=False):
    #
    # rename variables using some new expression, for example
    # 'X' -> 'y', useful if BDT is applied to a tree
    # with different structure than training data
    #
    outxml = outfn.replace('.gz', '')
    rootnode = xmlparser.parse(xmlreader(infn).handle())
    for v in rootnode.xpath('//Variable'):
        k = v.attrib['Title']
        if k not in varmap:
            continue
        v.attrib['Expression'] = varmap[k]
        v.attrib['Label'] = varmap[k]
        v.attrib['Title'] = varmap[k]
        v.attrib['Internal'] = varmap[k]
        if resetRange:
            v.attrib['Min'] = '-1.0e100'
            v.attrib['Max'] = '+1.0e100'
    hndl = open(outxml,'w')
    hndl.write('<?xml version="1.0"?>\n')
    rootnode.write(hndl)
    hndl.write('\n')
    hndl.close()
    if outfn.endswith('.gz'):
        shell('gzip %s'%(outxml))
开发者ID:dtmori,项目名称:HWW,代码行数:26,代码来源:tmvatools.py


示例13: setup_server

def setup_server(cfg, db):
    '''Setup seafile server with the setup-seafile.sh script. We use pexpect to
    interactive with the setup process of the script.
    '''
    info('uncompressing server tarball')
    shell('tar xf seafile-server_{}_x86-64.tar.gz -C {}'
          .format(cfg.version, cfg.installdir))
    if db == 'mysql':
        autosetup_mysql(cfg)
    else:
        autosetup_sqlite3(cfg)

    with open(join(cfg.installdir, 'conf/seahub_settings.py'), 'a') as fp:
        fp.write('\n')
        fp.write('DEBUG = True')
        fp.write('\n')
        fp.write('''\
REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {
        'ping': '600/minute',
        'anon': '1000/minute',
        'user': '1000/minute',
    },
}''')
        fp.write('\n')
开发者ID:285452612,项目名称:seafile,代码行数:25,代码来源:autosetup.py


示例14: clone

 def clone(self):
     if exists(self.name):
         with cd(self.name):
             shell('git fetch origin --tags')
     else:
         shell('git clone --depth=1 --branch {} {}'.format(self.branch,
                                                           self.url))
开发者ID:EagleSmith,项目名称:seafile,代码行数:7,代码来源:run.py


示例15: add_mail_user

def add_mail_user(email, pw, privs, env):
	# validate email
	if email.strip() == "":
		return ("No email address provided.", 400)
	if not validate_email(email, mode='user'):
		return ("Invalid email address.", 400)

	# validate password
	if pw.strip() == "":
		return ("No password provided.", 400)
	if re.search(r"[\s]", pw):
		return ("Passwords cannot contain spaces.", 400)
	if len(pw) < 4:
		return ("Passwords must be at least four characters.", 400)

	# validate privileges
	if privs is None or privs.strip() == "":
		privs = []
	else:
		privs = privs.split("\n")
		for p in privs:
			validation = validate_privilege(p)
			if validation: return validation

	# get the database
	conn, c = open_database(env, with_connection=True)

	# hash the password
	pw = utils.shell('check_output', ["/usr/bin/doveadm", "pw", "-s", "SHA512-CRYPT", "-p", pw]).strip()

	# add the user to the database
	try:
		c.execute("INSERT INTO users (email, password, privileges) VALUES (?, ?, ?)",
			(email, pw, "\n".join(privs)))
	except sqlite3.IntegrityError:
		return ("User already exists.", 400)

	# write databasebefore next step
	conn.commit()

	# Create the user's INBOX, Spam, and Drafts folders, and subscribe them.
	# K-9 mail will poll every 90 seconds if a Drafts folder does not exist, so create it
	# to avoid unnecessary polling.

	# Check if the mailboxes exist before creating them. When creating a user that had previously
	# been deleted, the mailboxes will still exist because they are still on disk.
	try:
		existing_mboxes = utils.shell('check_output', ["doveadm", "mailbox", "list", "-u", email, "-8"], capture_stderr=True).split("\n")
	except subprocess.CalledProcessError as e:
		c.execute("DELETE FROM users WHERE email=?", (email,))
		conn.commit()
		return ("Failed to initialize the user: " + e.output.decode("utf8"), 400)

	for folder in ("INBOX", "Spam", "Drafts"):
		if folder not in existing_mboxes:
			utils.shell('check_call', ["doveadm", "mailbox", "create", "-u", email, "-s", folder])

	# Update things in case any new domains are added.
	return kick(env, "mail user added")
开发者ID:ch000,项目名称:mailinabox,代码行数:59,代码来源:mailconfig.py


示例16: autosetup_sqlite3

def autosetup_sqlite3(cfg):
    setup_script = get_script(cfg, 'setup-seafile.sh')
    shell('''sed -i -e '/^check_root;.*/d' "{}"'''.format(setup_script))

    if cfg.initmode == 'prompt':
        setup_sqlite3_prompt(setup_script)
    else:
        setup_sqlite3_auto(setup_script)
开发者ID:285452612,项目名称:seafile,代码行数:8,代码来源:autosetup.py


示例17: copy_dist

 def copy_dist(self):
     self.make_dist()
     tarball = glob.glob('*.tar.gz')[0]
     info('copying %s to %s', tarball, SRCDIR)
     shell('cp {} {}'.format(tarball, SRCDIR))
     m = re.match('{}-(.*).tar.gz'.format(self.name), basename(tarball))
     if m:
         self.version = m.group(1)
开发者ID:EagleSmith,项目名称:seafile,代码行数:8,代码来源:run.py


示例18: migration_9

def migration_9(env):
	# Add a column to the aliases table to store permitted_senders,
	# which is a list of user account email addresses that are
	# permitted to send mail using this alias instead of their own
	# address. This was motivated by the addition of #427 ("Reject
	# outgoing mail if FROM does not match Login") - which introduced
	# the notion of outbound permitted-senders.
	db = os.path.join(env["STORAGE_ROOT"], 'mail/users.sqlite')
	shell("check_call", ["sqlite3", db, "ALTER TABLE aliases ADD permitted_senders TEXT"])
开发者ID:chrisc93,项目名称:mailinabox,代码行数:9,代码来源:migrate.py


示例19: make_dist

 def make_dist(self):
     cmds = [
         # 'git add -f media/css/*.css',
         # 'git commit -a -m "%s"' % msg,
         './tools/gen-tarball.py --version={} --branch=HEAD >/dev/null'
         .format(seafile_version),
     ]
     for cmd in cmds:
         shell(cmd, env=make_build_env())
开发者ID:EagleSmith,项目名称:seafile,代码行数:9,代码来源:run.py


示例20: run_checks

def run_checks(env, output):
	# clear the DNS cache so our DNS checks are most up to date
	shell('check_call', ["/usr/sbin/service", "bind9", "restart"])
	
	# perform checks
	env["out"] = output
	run_system_checks(env)
	run_network_checks(env)
	run_domain_checks(env)
开发者ID:quangpham,项目名称:mailinabox,代码行数:9,代码来源:status_checks.py



注:本文中的utils.shell函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.shell_exec函数代码示例发布时间:2022-05-26
下一篇:
Python utils.sharedX函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap