本文整理汇总了Python中utils.shell函数的典型用法代码示例。如果您正苦于以下问题:Python shell函数的具体用法?Python shell怎么用?Python shell使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了shell函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: install_cert
def install_cert(domain, ssl_cert, ssl_chain, env):
# Write the combined cert+chain to a temporary path and validate that it is OK.
# The certificate always goes above the chain.
import tempfile
fd, fn = tempfile.mkstemp('.pem')
os.write(fd, (ssl_cert + '\n' + ssl_chain).encode("ascii"))
os.close(fd)
# Do validation on the certificate before installing it.
ssl_private_key = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_private_key.pem'))
cert_status, cert_status_details = check_certificate(domain, fn, ssl_private_key)
if cert_status != "OK":
if cert_status == "SELF-SIGNED":
cert_status = "This is a self-signed certificate. I can't install that."
os.unlink(fn)
if cert_status_details is not None:
cert_status += " " + cert_status_details
return cert_status
# Where to put it?
# Make a unique path for the certificate.
from cryptography.hazmat.primitives import hashes
from binascii import hexlify
cert = load_pem(load_cert_chain(fn)[0])
all_domains, cn = get_certificate_domains(cert)
path = "%s-%s-%s.pem" % (
safe_domain_name(cn), # common name, which should be filename safe because it is IDNA-encoded, but in case of a malformed cert make sure it's ok to use as a filename
cert.not_valid_after.date().isoformat().replace("-", ""), # expiration date
hexlify(cert.fingerprint(hashes.SHA256())).decode("ascii")[0:8], # fingerprint prefix
)
ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', path))
# Install the certificate.
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
shutil.move(fn, ssl_certificate)
ret = ["OK"]
# When updating the cert for PRIMARY_HOSTNAME, symlink it from the system
# certificate path, which is hard-coded for various purposes, and then
# restart postfix and dovecot.
if domain == env['PRIMARY_HOSTNAME']:
# Update symlink.
system_ssl_certificate = os.path.join(os.path.join(env["STORAGE_ROOT"], 'ssl', 'ssl_certificate.pem'))
os.unlink(system_ssl_certificate)
os.symlink(ssl_certificate, system_ssl_certificate)
# Restart postfix and dovecot so they pick up the new file.
shell('check_call', ["/usr/sbin/service", "postfix", "restart"])
shell('check_call', ["/usr/sbin/service", "dovecot", "restart"])
ret.append("mail services restarted")
# The DANE TLSA record will remain valid so long as the private key
# hasn't changed. We don't ever change the private key automatically.
# If the user does it, they must manually update DNS.
# Update the web configuration so nginx picks up the new certificate file.
from web_update import do_web_update
ret.append( do_web_update(env) )
return "\n".join(ret)
开发者ID:ContainerizeUS,项目名称:mailinabox,代码行数:60,代码来源:ssl_certificates.py
示例2: list_apt_updates
def list_apt_updates(apt_update=True):
# See if we have this information cached recently.
# Keep the information for 8 hours.
global _apt_updates
if _apt_updates is not None and _apt_updates[0] > datetime.datetime.now() - datetime.timedelta(hours=8):
return _apt_updates[1]
# Run apt-get update to refresh package list. This should be running daily
# anyway, so on the status checks page don't do this because it is slow.
if apt_update:
shell("check_call", ["/usr/bin/apt-get", "-qq", "update"])
# Run apt-get upgrade in simulate mode to get a list of what
# it would do.
simulated_install = shell("check_output", ["/usr/bin/apt-get", "-qq", "-s", "upgrade"])
pkgs = []
for line in simulated_install.split('\n'):
if line.strip() == "":
continue
if re.match(r'^Conf .*', line):
# remove these lines, not informative
continue
m = re.match(r'^Inst (.*) \[(.*)\] \((\S*)', line)
if m:
pkgs.append({ "package": m.group(1), "version": m.group(3), "current_version": m.group(2) })
else:
pkgs.append({ "package": "[" + line + "]", "version": "", "current_version": "" })
# Cache for future requests.
_apt_updates = (datetime.datetime.now(), pkgs)
return pkgs
开发者ID:baltoche,项目名称:mailinabox,代码行数:32,代码来源:status_checks.py
示例3: main
def main():
_mkdirs(SRCDIR, INSTALLDIR)
setup_logging()
fetch_and_build()
for db in ('sqlite3', 'mysql'):
shell('rm -rf {}/*'.format(INSTALLDIR))
setup_and_test(db)
开发者ID:EagleSmith,项目名称:seafile,代码行数:7,代码来源:run.py
示例4: add_mail_user
def add_mail_user(email, pw, privs, env):
# accept IDNA domain names but normalize to Unicode before going into database
email = sanitize_idn_email_address(email)
# validate email
if email.strip() == "":
return ("No email address provided.", 400)
elif not validate_email(email):
return ("Invalid email address.", 400)
elif not validate_email(email, mode='user'):
return ("User account email addresses may only use the ASCII letters A-Z, the digits 0-9, underscore (_), hyphen (-), and period (.).", 400)
validate_password(pw)
# validate privileges
if privs is None or privs.strip() == "":
privs = []
else:
privs = privs.split("\n")
for p in privs:
validation = validate_privilege(p)
if validation: return validation
# get the database
conn, c = open_database(env, with_connection=True)
# hash the password
pw = hash_password(pw)
# add the user to the database
try:
c.execute("INSERT INTO users (email, password, privileges) VALUES (?, ?, ?)",
(email, pw, "\n".join(privs)))
except sqlite3.IntegrityError:
return ("User already exists.", 400)
# write databasebefore next step
conn.commit()
# Create & subscribe the user's INBOX, Trash, Spam, and Drafts folders.
# * Our sieve rule for spam expects that the Spam folder exists.
# * Roundcube will show an error if the user tries to delete a message before the Trash folder exists (#359).
# * K-9 mail will poll every 90 seconds if a Drafts folder does not exist, so create it
# to avoid unnecessary polling.
# Check if the mailboxes exist before creating them. When creating a user that had previously
# been deleted, the mailboxes will still exist because they are still on disk.
try:
existing_mboxes = utils.shell('check_output', ["doveadm", "mailbox", "list", "-u", email, "-8"], capture_stderr=True).split("\n")
except subprocess.CalledProcessError as e:
c.execute("DELETE FROM users WHERE email=?", (email,))
conn.commit()
return ("Failed to initialize the user: " + e.output.decode("utf8"), 400)
for folder in ("INBOX", "Trash", "Spam", "Drafts"):
if folder not in existing_mboxes:
utils.shell('check_call', ["doveadm", "mailbox", "create", "-u", email, "-s", folder])
# Update things in case any new domains are added.
return kick(env, "mail user added")
开发者ID:benschumacher,项目名称:mailinabox,代码行数:60,代码来源:mailconfig.py
示例5: run
def run (args):
"""Removes all Mygrate data (cancels mygrate init)."""
cmds.init.require_init()
path = repo.repopath() + '/.mygrate'
print "Removing everything under %s." % path
utils.shell ('rm -rf %s' % path)
print "Mygrate repo successfully removed. Use mygrate init to reinitialize it."
开发者ID:mustangostang,项目名称:mygrate,代码行数:7,代码来源:clean.py
示例6: ensure_ssl_certificate_exists
def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, csr_path, env):
# For domains besides PRIMARY_HOSTNAME, generate a self-signed certificate if one doesn't
# already exist. See setup/mail.sh for documentation.
if domain == env['PRIMARY_HOSTNAME']:
return
if os.path.exists(ssl_certificate):
return
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
# Generate a new self-signed certificate using the same private key that we already have.
# Start with a CSR.
shell("check_call", [
"openssl", "req", "-new",
"-key", ssl_key,
"-out", csr_path,
"-subj", "/C=%s/ST=/L=/O=/CN=%s" % (env["CSR_COUNTRY"], domain)])
# And then make the certificate.
shell("check_call", [
"openssl", "x509", "-req",
"-days", "365",
"-in", csr_path,
"-signkey", ssl_key,
"-out", ssl_certificate])
开发者ID:zentra,项目名称:mailinabox,代码行数:28,代码来源:web_update.py
示例7: check_imap_login
def check_imap_login(self, email, pw, env):
# Validate a user's credentials.
# Sanity check.
if email == "" or pw == "":
return "Enter an email address and password."
# Authenticate.
try:
# Use doveadm to check credentials. doveadm will return
# a non-zero exit status if the credentials are no good,
# and check_call will raise an exception in that case.
utils.shell('check_call', [
"/usr/bin/doveadm",
"auth", "test",
email, pw
])
except:
# Login failed.
return "Invalid email address or password."
# Authorize.
# (This call should never fail on a valid user.)
privs = get_mail_user_privileges(email, env)
if isinstance(privs, tuple): raise Exception("Error getting privileges.")
if "admin" not in privs:
return "You are not an administrator for this system."
return "OK"
开发者ID:Jheguy2,项目名称:mailinabox,代码行数:29,代码来源:auth.py
示例8: ensure_ssl_certificate_exists
def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, csr_path, env):
# For domains besides PRIMARY_HOSTNAME, generate a self-signed certificate if
# a certificate doesn't already exist. See setup/mail.sh for documentation.
if domain == env['PRIMARY_HOSTNAME']:
return
# Sanity check. Shouldn't happen. A non-primary domain might use this
# certificate (see above), but then the certificate should exist anyway.
if ssl_certificate == os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_certificate.pem'):
return
if os.path.exists(ssl_certificate):
return
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
# Generate a new self-signed certificate using the same private key that we already have.
# Start with a CSR.
shell("check_call", [
"openssl", "req", "-new",
"-key", ssl_key,
"-out", csr_path,
"-sha256",
"-subj", "/C=%s/ST=/L=/O=/CN=%s" % (env["CSR_COUNTRY"], domain)])
# And then make the certificate.
shell("check_call", [
"openssl", "x509", "-req",
"-days", "365",
"-in", csr_path,
"-signkey", ssl_key,
"-out", ssl_certificate])
开发者ID:alchen99,项目名称:mailinabox,代码行数:34,代码来源:web_update.py
示例9: ensure_ssl_certificate_exists
def ensure_ssl_certificate_exists(domain, ssl_key, ssl_certificate, env):
# For domains besides PRIMARY_HOSTNAME, generate a self-signed certificate if
# a certificate doesn't already exist. See setup/mail.sh for documentation.
if domain == env['PRIMARY_HOSTNAME']:
return
# Sanity check. Shouldn't happen. A non-primary domain might use this
# certificate (see above), but then the certificate should exist anyway.
if ssl_certificate == os.path.join(env["STORAGE_ROOT"], 'ssl/ssl_certificate.pem'):
return
if os.path.exists(ssl_certificate):
return
os.makedirs(os.path.dirname(ssl_certificate), exist_ok=True)
# Generate a new self-signed certificate using the same private key that we already have.
# Start with a CSR written to a temporary file.
with tempfile.NamedTemporaryFile(mode="w") as csr_fp:
csr_fp.write(create_csr(domain, ssl_key, env))
csr_fp.flush() # since we won't close until after running 'openssl x509', since close triggers delete.
# And then make the certificate.
shell("check_call", [
"openssl", "x509", "-req",
"-days", "365",
"-in", csr_fp.name,
"-signkey", ssl_key,
"-out", ssl_certificate])
开发者ID:mboersma,项目名称:mailinabox,代码行数:31,代码来源:web_update.py
示例10: do_web_update
def do_web_update(env):
# Build an nginx configuration file.
nginx_conf = open(os.path.join(os.path.dirname(__file__), "../conf/nginx-top.conf")).read()
# Add configuration for each web domain.
template1 = open(os.path.join(os.path.dirname(__file__), "../conf/nginx.conf")).read()
template2 = open(os.path.join(os.path.dirname(__file__), "../conf/nginx-primaryonly.conf")).read()
for domain in get_web_domains(env):
nginx_conf += make_domain_config(domain, template1, template2, env)
# Did the file change? If not, don't bother writing & restarting nginx.
nginx_conf_fn = "/etc/nginx/conf.d/local.conf"
if os.path.exists(nginx_conf_fn):
with open(nginx_conf_fn) as f:
if f.read() == nginx_conf:
return ""
# Save the file.
with open(nginx_conf_fn, "w") as f:
f.write(nginx_conf)
# Kick nginx. Since this might be called from the web admin
# don't do a 'restart'. That would kill the connection before
# the API returns its response. A 'reload' should be good
# enough and doesn't break any open connections.
shell('check_call', ["/usr/sbin/service", "nginx", "reload"])
return "web updated\n"
开发者ID:alchen99,项目名称:mailinabox,代码行数:28,代码来源:web_update.py
示例11: transformBDTVariables
def transformBDTVariables(infn, outfn, varmap):
#
# apply a functional transformation f(x) to all
# nodes with cuts '< X' such that the node definition becomes '< f(X)'
#
# varmap is a map of variable to valid TFormula expression, eg. { 'x' : 'pow(x,2)/1.0e3' }
#
outxml = outfn.replace('.gz', '')
rootnode = xmlparser.parse(xmlreader(infn).handle())
varlist = []
for v in rootnode.xpath('//Variable'):
k = v.attrib['Title']
if k in varmap:
v.attrib['Min'] = str(eval(varmap[k].replace(k,v.attrib['Min'])))
v.attrib['Max'] = str(eval(varmap[k].replace(k,v.attrib['Max'])))
varlist += [k]
for n in rootnode.xpath('//Node'):
cutvar = varlist[int(n.attrib['IVar'])]
if cutvar in varmap:
n.attrib['Cut'] = str(eval(varmap[cutvar].replace(cutvar,n.attrib['Cut'])))
hndl = open(outxml,'w')
hndl.write('<?xml version="1.0"?>\n')
rootnode.write(hndl)
hndl.write('\n')
hndl.close()
if outfn.endswith('.gz'):
shell('gzip %s'%(outxml))
开发者ID:dtmori,项目名称:HWW,代码行数:27,代码来源:tmvatools.py
示例12: swapBDTVariables
def swapBDTVariables(infn, outfn, varmap, resetRange=False):
#
# rename variables using some new expression, for example
# 'X' -> 'y', useful if BDT is applied to a tree
# with different structure than training data
#
outxml = outfn.replace('.gz', '')
rootnode = xmlparser.parse(xmlreader(infn).handle())
for v in rootnode.xpath('//Variable'):
k = v.attrib['Title']
if k not in varmap:
continue
v.attrib['Expression'] = varmap[k]
v.attrib['Label'] = varmap[k]
v.attrib['Title'] = varmap[k]
v.attrib['Internal'] = varmap[k]
if resetRange:
v.attrib['Min'] = '-1.0e100'
v.attrib['Max'] = '+1.0e100'
hndl = open(outxml,'w')
hndl.write('<?xml version="1.0"?>\n')
rootnode.write(hndl)
hndl.write('\n')
hndl.close()
if outfn.endswith('.gz'):
shell('gzip %s'%(outxml))
开发者ID:dtmori,项目名称:HWW,代码行数:26,代码来源:tmvatools.py
示例13: setup_server
def setup_server(cfg, db):
'''Setup seafile server with the setup-seafile.sh script. We use pexpect to
interactive with the setup process of the script.
'''
info('uncompressing server tarball')
shell('tar xf seafile-server_{}_x86-64.tar.gz -C {}'
.format(cfg.version, cfg.installdir))
if db == 'mysql':
autosetup_mysql(cfg)
else:
autosetup_sqlite3(cfg)
with open(join(cfg.installdir, 'conf/seahub_settings.py'), 'a') as fp:
fp.write('\n')
fp.write('DEBUG = True')
fp.write('\n')
fp.write('''\
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'ping': '600/minute',
'anon': '1000/minute',
'user': '1000/minute',
},
}''')
fp.write('\n')
开发者ID:285452612,项目名称:seafile,代码行数:25,代码来源:autosetup.py
示例14: clone
def clone(self):
if exists(self.name):
with cd(self.name):
shell('git fetch origin --tags')
else:
shell('git clone --depth=1 --branch {} {}'.format(self.branch,
self.url))
开发者ID:EagleSmith,项目名称:seafile,代码行数:7,代码来源:run.py
示例15: add_mail_user
def add_mail_user(email, pw, privs, env):
# validate email
if email.strip() == "":
return ("No email address provided.", 400)
if not validate_email(email, mode='user'):
return ("Invalid email address.", 400)
# validate password
if pw.strip() == "":
return ("No password provided.", 400)
if re.search(r"[\s]", pw):
return ("Passwords cannot contain spaces.", 400)
if len(pw) < 4:
return ("Passwords must be at least four characters.", 400)
# validate privileges
if privs is None or privs.strip() == "":
privs = []
else:
privs = privs.split("\n")
for p in privs:
validation = validate_privilege(p)
if validation: return validation
# get the database
conn, c = open_database(env, with_connection=True)
# hash the password
pw = utils.shell('check_output', ["/usr/bin/doveadm", "pw", "-s", "SHA512-CRYPT", "-p", pw]).strip()
# add the user to the database
try:
c.execute("INSERT INTO users (email, password, privileges) VALUES (?, ?, ?)",
(email, pw, "\n".join(privs)))
except sqlite3.IntegrityError:
return ("User already exists.", 400)
# write databasebefore next step
conn.commit()
# Create the user's INBOX, Spam, and Drafts folders, and subscribe them.
# K-9 mail will poll every 90 seconds if a Drafts folder does not exist, so create it
# to avoid unnecessary polling.
# Check if the mailboxes exist before creating them. When creating a user that had previously
# been deleted, the mailboxes will still exist because they are still on disk.
try:
existing_mboxes = utils.shell('check_output', ["doveadm", "mailbox", "list", "-u", email, "-8"], capture_stderr=True).split("\n")
except subprocess.CalledProcessError as e:
c.execute("DELETE FROM users WHERE email=?", (email,))
conn.commit()
return ("Failed to initialize the user: " + e.output.decode("utf8"), 400)
for folder in ("INBOX", "Spam", "Drafts"):
if folder not in existing_mboxes:
utils.shell('check_call', ["doveadm", "mailbox", "create", "-u", email, "-s", folder])
# Update things in case any new domains are added.
return kick(env, "mail user added")
开发者ID:ch000,项目名称:mailinabox,代码行数:59,代码来源:mailconfig.py
示例16: autosetup_sqlite3
def autosetup_sqlite3(cfg):
setup_script = get_script(cfg, 'setup-seafile.sh')
shell('''sed -i -e '/^check_root;.*/d' "{}"'''.format(setup_script))
if cfg.initmode == 'prompt':
setup_sqlite3_prompt(setup_script)
else:
setup_sqlite3_auto(setup_script)
开发者ID:285452612,项目名称:seafile,代码行数:8,代码来源:autosetup.py
示例17: copy_dist
def copy_dist(self):
self.make_dist()
tarball = glob.glob('*.tar.gz')[0]
info('copying %s to %s', tarball, SRCDIR)
shell('cp {} {}'.format(tarball, SRCDIR))
m = re.match('{}-(.*).tar.gz'.format(self.name), basename(tarball))
if m:
self.version = m.group(1)
开发者ID:EagleSmith,项目名称:seafile,代码行数:8,代码来源:run.py
示例18: migration_9
def migration_9(env):
# Add a column to the aliases table to store permitted_senders,
# which is a list of user account email addresses that are
# permitted to send mail using this alias instead of their own
# address. This was motivated by the addition of #427 ("Reject
# outgoing mail if FROM does not match Login") - which introduced
# the notion of outbound permitted-senders.
db = os.path.join(env["STORAGE_ROOT"], 'mail/users.sqlite')
shell("check_call", ["sqlite3", db, "ALTER TABLE aliases ADD permitted_senders TEXT"])
开发者ID:chrisc93,项目名称:mailinabox,代码行数:9,代码来源:migrate.py
示例19: make_dist
def make_dist(self):
cmds = [
# 'git add -f media/css/*.css',
# 'git commit -a -m "%s"' % msg,
'./tools/gen-tarball.py --version={} --branch=HEAD >/dev/null'
.format(seafile_version),
]
for cmd in cmds:
shell(cmd, env=make_build_env())
开发者ID:EagleSmith,项目名称:seafile,代码行数:9,代码来源:run.py
示例20: run_checks
def run_checks(env, output):
# clear the DNS cache so our DNS checks are most up to date
shell('check_call', ["/usr/sbin/service", "bind9", "restart"])
# perform checks
env["out"] = output
run_system_checks(env)
run_network_checks(env)
run_domain_checks(env)
开发者ID:quangpham,项目名称:mailinabox,代码行数:9,代码来源:status_checks.py
注:本文中的utils.shell函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论