本文整理汇总了Python中utils.get_config函数的典型用法代码示例。如果您正苦于以下问题:Python get_config函数的具体用法?Python get_config怎么用?Python get_config使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_config函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: process
def process(self):
"""Process all keystone accounts to sync."""
orig_auth_url = get_config("auth", "keystone_origin")
orig_admin_tenant, orig_admin_user, orig_admin_password = get_config(
"auth", "keystone_origin_admin_credentials"
).split(":")
oa_st_url, orig_admin_token = self.get_swift_auth(
orig_auth_url, orig_admin_tenant, orig_admin_user, orig_admin_password
)
dest_auth_url = get_config("auth", "keystone_dest")
# we assume orig and dest passwd are the same obv synchronized.
dst_st_url, dest_admin_token = self.get_swift_auth(
dest_auth_url, orig_admin_tenant, orig_admin_user, orig_admin_password
)
bare_oa_st_url = oa_st_url[: oa_st_url.find("AUTH_")] + "AUTH_"
bare_dst_st_url = dst_st_url[: dst_st_url.find("AUTH_")] + "AUTH_"
self.keystone_cnx = self.get_ks_auth_orig()
for tenant in self.keystone_cnx.tenants.list():
user_orig_st_url = bare_oa_st_url + tenant.id
user_dst_st_url = bare_dst_st_url + tenant.id
self.sync_account(user_orig_st_url, orig_admin_token, user_dst_st_url, dest_admin_token)
开发者ID:joehakimrahme,项目名称:swiftsync,代码行数:26,代码来源:accounts.py
示例2: start_gpio
def start_gpio(self):
if mock_gpio:
return self.__on_gpio_status_changed(True, mocked_gpio=True)
gpio_mappings = get_config('gpio_mappings')
cmd = [
"python",
os.path.join(BASE_DIR, "core", "interact", "gpio_builder.py"),
json.dumps(gpio_mappings),
str(self.conf['api_port'])
]
bouncetime = get_config('bouncetime')
if bouncetime is not None:
cmd.append(bouncetime)
# signal start
from subprocess import Popen
DEV_NULL = open(os.devnull, 'w')
gpio_process = Popen(cmd, shell=False, stdout=DEV_NULL, stderr=DEV_NULL)
with open(self.conf['d_files']['gpio']['pid'], 'wb+') as gpio_pid:
gpio_pid.write(str(gpio_process.pid))
return self.__on_gpio_status_changed(True)
开发者ID:MetroPictures,项目名称:MPCore,代码行数:26,代码来源:gpio.py
示例3: swift_cnx
def swift_cnx(acc, user):
ks_url = get_config('auth', 'keystone_origin')
cnx = sclient.Connection(ks_url,
user=user,
key=get_config('filler', 'default_user_password'),
tenant_name=acc[0],
auth_version=2)
return cnx
开发者ID:Greyhatno,项目名称:swiftsync,代码行数:8,代码来源:filler.py
示例4: get_ks_auth_orig
def get_ks_auth_orig(self):
"""Get keystone cnx from config."""
orig_auth_url = get_config("auth", "keystone_origin")
cfg = get_config("auth", "keystone_origin_admin_credentials")
(tenant_name, username, password) = cfg.split(":")
return keystoneclient.v2_0.client.Client(
auth_url=orig_auth_url, username=username, password=password, tenant_name=tenant_name
)
开发者ID:joehakimrahme,项目名称:swiftsync,代码行数:9,代码来源:accounts.py
示例5: __init__
def __init__(self):
self.split_chan = False
if get_config("split_audio_channels"):
self.split_chan = True
self.split_map = get_config("split_map")
self.max_audio_level = get_config("max_audio_level")
if self.max_audio_level is None:
self.max_audio_level = MAX_AUDIO_LEVEL
logging.basicConfig(filename=self.conf["d_files"]["audio"]["log"], level=logging.DEBUG)
开发者ID:MetroPictures,项目名称:MPCore,代码行数:12,代码来源:audio_pad.py
示例6: update_cdn
def update_cdn():
use_cdn = get_config("use_cdn")
if use_cdn is not None and use_cdn is False:
print "Not pulling from cdn!"
return
media_manifest, cdn = get_config(['media_manifest', 'cdn'])
# download media from "cdn"
if media_manifest is None or len(media_manifest) == 0:
media_manifest = ["prompts"]
else:
media_manifest.append("prompts")
ftp = FTP()
ftp.connect(cdn['addr'], cdn['port'])
ftp.login(cdn['user'])
ftp.cwd(cdn['home_dir'])
for mm in media_manifest:
out_dir = os.path.join(BASE_DIR, "media", mm)
if mm == "video":
out_dir = os.path.join(out_dir, "viz")
if not os.path.exists(out_dir):
os.makedirs(out_dir)
print "initialized empty directory \"%s\" at %s" % (mm, out_dir)
try:
ftp.cwd(mm)
except Exception as e:
print "directory \"%s\" doesn't exist in CDN" % mm
continue
dir_list = []
ftp.dir(dir_list.append)
dir_list = [d for d in dir_list if d not in [UNPLAYABLE_FILES]]
for l in [l.split(" ")[-1] for l in dir_list]:
out_file = os.path.join(out_dir, l)
try:
with open(out_file, 'wb+') as O:
ftp.retrbinary("RETR %s" % l, O.write)
except Exception as e:
print "could not download %s to %s" % (l, out_file)
print e, type(e)
continue
ftp.cwd('..')
ftp.quit()
开发者ID:MetroPictures,项目名称:MPCore,代码行数:53,代码来源:setup.py
示例7: __init__
def __init__(self):
#signal.signal(signal.SIGINT, self._exit_gracefully)
#signal.signal(signal.SIGQUIT, self._exit_gracefully)
#signal.signal(signal.SIGTERM, self._exit_gracefully)
_config_client = utils.get_config(settings.CONF_FILE, 'client')
_log_level = logging.INFO
if type(_config_client) is dict:
self.migas_server = _config_client.get('server', 'migasfree.org')
self.migas_proxy = _config_client.get('proxy', None)
self.migas_ssl_cert = _config_client.get('ssl_cert', None)
if 'debug' in _config_client:
if _config_client['debug'] == 'True' \
or _config_client['debug'] == '1' \
or _config_client['debug'] == 'On':
self._debug = True
_log_level = logging.DEBUG
_config_packager = utils.get_config(settings.CONF_FILE, 'packager')
if type(_config_packager) is dict:
self.packager_user = _config_packager.get('user', None)
self.packager_pwd = _config_packager.get('password', None)
self.packager_version = _config_packager.get('version', None)
self.packager_store = _config_packager.get('store', None)
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
level=_log_level,
filename=settings.LOG_FILE
)
logging.info('*' * 76)
logging.info('%s in execution', self.CMD)
logging.debug('Config client: %s', _config_client)
logging.debug('Config packager: %s', _config_packager)
# init UrlRequest
_url_base = '%s/api/' % str(self.migas_server)
if self.migas_ssl_cert:
_url_base = '%s://%s' % ('https', _url_base)
else:
_url_base = '%s://%s' % ('http', _url_base)
self._url_request = url_request.UrlRequest(
debug=self._debug,
url_base=_url_base,
proxy=self.migas_proxy,
info_keys={
'path': settings.KEYS_PATH,
'private': self.PRIVATE_KEY,
'public': self.PUBLIC_KEY
},
cert=self.migas_ssl_cert
)
开发者ID:jpueyob,项目名称:migasfree-client,代码行数:53,代码来源:upload.py
示例8: _create_user
def _create_user(account_name, account_id):
user = get_rand_str(mode='user_')
# Create a user in that tenant
uid = client.users.create(user,
get_config('filler',
'default_user_password'),
get_config('filler', 'default_user_email'),
account_id)
# Get swift_operator_role id
roleid = [role.id for role in client.roles.list()
if role.name == get_config('filler', 'swift_operator_role')]
roleid = roleid[0]
# Add tenant/user in swift operator role/group
client.roles.add_user_role(uid.id, roleid, account_id)
return (user, uid.id, roleid)
开发者ID:Greyhatno,项目名称:swiftsync,代码行数:15,代码来源:filler.py
示例9: team_accept_invite_request
def team_accept_invite_request():
params = utils.flat_multi(request.form)
_user = user.get_user().first()
if not user.in_team(_user):
raise WebException("You must be in a team!")
_team = get_team(tid=_user.tid).first()
tid = _team.tid
if _user.uid != _team.owner:
raise WebException("You must be the captain of your team to rescind invitations!")
if _team.finalized:
raise WebException("This team is finalized.")
if len(_team.get_members()) >= utils.get_config("team_size"):
raise WebException("Your team is full.")
uid = params.get("uid")
_user2 = user.get_user(uid=uid).first()
if user.in_team(_user2):
raise WebException("This user is already in a team!")
invitation = TeamInvitations.query.filter_by(rtype=1, frid=_user2.uid, toid=tid).first()
if invitation is None:
raise WebException("Invitation doesn't exist.")
with app.app_context():
_user2 = Users.query.filter_by(uid=_user2.uid).first()
_user2.tid = tid
db.session.delete(invitation)
invitation2 = TeamInvitations.query.filter_by(rtype=0, frid=tid, toid=_user2.uid).first()
if invitation2 is not None:
db.session.delete(invitation2)
db.session.commit()
db.session.close()
return { "success": 1, "message": "Success!" }
开发者ID:EasyCTF,项目名称:openctf-docker,代码行数:35,代码来源:team.py
示例10: team_accept_invite
def team_accept_invite():
params = utils.flat_multi(request.form)
_user = user.get_user().first()
if user.in_team(_user):
raise WebException("You're already in a team!")
tid = params.get("tid")
_team = get_team(tid=tid).first()
if _team is None:
raise WebException("Team not found.")
if len(_team.get_members()) >= utils.get_config("team_size"):
raise WebException("This team is full.")
invitation = TeamInvitations.query.filter_by(rtype=0, frid=tid, toid=_user.uid).first()
if invitation is None:
raise WebException("Invitation doesn't exist.")
with app.app_context():
_user = Users.query.filter_by(uid=_user.uid).first()
_user.tid = tid
db.session.delete(invitation)
invitation2 = TeamInvitations.query.filter_by(rtype=1, frid=_user.uid, toid=tid).first()
if invitation2 is not None:
db.session.delete(invitation2)
db.session.add(Activity(_user.uid, 6, _team.tid, -1))
db.session.commit()
db.session.close()
return { "success": 1, "message": "Success!" }
开发者ID:EasyCTF,项目名称:OpenCTF,代码行数:30,代码来源:team.py
示例11: set_autostart_info
def set_autostart_info():
info = get_config('info')
if info is None:
return False
# setup auto-start
for f in [".profile", ".mp_profile"]:
Popen(["cp", os.path.join(BASE_DIR, "core", "lib", "autostart", f), os.path.expanduser('~')])
with open(os.path.join(os.path.expanduser('~'), ".mp_autostart"), 'wb+') as A:
A.write("cd %s && python %s.py --start" % (info['dir'], info['module']))
with open(os.path.join(os.path.expanduser('~'), ".profile"), 'ab') as A:
A.write("\nsleep 15 && ~/.mp_autostart")
Popen(["sudo", "cp", os.path.join(BASE_DIR, "core", "lib", "autostart", "rc.local"), os.path.join("/", "etc", "rc.local")])
# set media info
if "sculpture" in info.keys():
info_directives = [
"export SCULPTURE_TITLE=\"%s\"" % info['sculpture']['title'],
"export SCULPTURE_LINK=\"%s\"" % info['sculpture']['link']
]
with open(os.path.join(os.path.expanduser('~'), ".mp_profile"), 'ab') as I:
I.write("\n%s" % "\n".join(info_directives))
return True
开发者ID:MetroPictures,项目名称:MPCore,代码行数:28,代码来源:setup.py
示例12: get_feats_in_partitions
def get_feats_in_partitions():
"""
Extracts features from all dataset and split them in train validation and
test sets
"""
conf = utils.get_config()
paths = utils.get_paths()
rows = utils.load_csv()
filters = conf['filters']
region_size = conf['region_size']
region_stride = conf['region_stride']
filtered_rows = [
row for row in rows if utils.check_filter(row, conf['filters'])]
train_rows, valid_rows, test_rows = utils.split_dataset(
filtered_rows, conf['valid_percent'], conf['test_percent'], rng=conf['rng_seed'])
conv = get_fprop_fn(False)
print 'Getting features from train...'
X_train = get_feats_from_rows(
train_rows, conv, conf['stride'])
print 'Getting features from valid...'
X_valid = get_feats_from_rows(
valid_rows, conv, conf['stride'])
print 'Getting features from test...'
X_test = get_feats_from_rows(
test_rows, conv, conf['stride'])
y_train = [row['classification'] == 'Malign' for row in train_rows]
y_valid = [row['classification'] == 'Malign' for row in valid_rows]
y_test = [row['classification'] == 'Malign' for row in test_rows]
return X_train, y_train, X_valid, y_valid, X_test, y_test
开发者ID:johnarevalo,项目名称:cnn-bcdr,代码行数:31,代码来源:fe_extraction.py
示例13: main
def main():
""" Main entry point.
Used in the console script we setup.
"""
from zulipRequestHandler import ZulipRequestHandler
from utils import get_config
config = get_config()
ZULIP_USERNAME = config.get('zulip', 'username')
ZULIP_API_KEY = config.get('zulip', 'api_key')
LED_SCREEN_ADDRESS = config.get('main', 'led_screen_address')
zulipRequestHandler = ZulipRequestHandler(ZULIP_USERNAME, ZULIP_API_KEY)
led_bot = LEDBot(
address=LED_SCREEN_ADDRESS, listeners=[zulipRequestHandler]
)
## Uncomment the lines below to be able to test the bot from the CLI.
# from cli_handler import CLIHandler
# led_bot = LEDBot(
# address=LED_SCREEN_ADDRESS,
# listeners=[CLIHandler(), zulipRequestHandler]
# )
led_bot.run()
开发者ID:dariajung,项目名称:LED-bot,代码行数:29,代码来源:bot_scheduler.py
示例14: get_feats_from_imagenet_in_partitions
def get_feats_from_imagenet_in_partitions():
conf = utils.get_config()
imagenet_data = os.path.join(
conf['models_path'], 'decafnet', 'imagenet.decafnet.epoch90')
imagenet_meta = os.path.join(
conf['models_path'], 'decafnet', 'imagenet.decafnet.meta')
net = DecafNet(imagenet_data, imagenet_meta)
rows = utils.get_filtered_rows()
sets = utils.split_dataset(
rows, conf['valid_percent'], conf['test_percent'], rng=conf['rng_seed'])
feats = []
ys = []
for s in sets:
X = np.zeros((len(s), 4096))
y = np.zeros(len(s))
for i, row in enumerate(s):
try:
log.info('processing %i-th of %i' % (i, len(s)))
origin, im = utils.extract_roi(row, 30, True)
scores = net.classify(np.asarray(im), center_only=True)
X[i] = net.feature('fc7_cudanet_out')
y[i] = utils.is_positive(row)
except:
continue
feats.append(X)
ys.append(y)
return feats[0], ys[0], feats[1], ys[1], feats[2], ys[2]
开发者ID:johnarevalo,项目名称:cnn-bcdr,代码行数:28,代码来源:fe_extraction.py
示例15: main
def main():
utils.check_python()
# fix py2exe
if hasattr(sys, "frozen") and sys.frozen in \
("windows_exe", "console_exe"):
p = os.path.dirname(os.path.abspath(sys.executable))
os.chdir(p)
config = utils.get_config(True)
utils.print_shadowsocks()
encrypt.init_table(config['password'], config['method'])
try:
logging.info("starting local at %s:%d" %
(config['local_address'], config['local_port']))
tcp_server = tcprelay.TCPRelay(config, True)
udp_server = udprelay.UDPRelay(config, True)
loop = eventloop.EventLoop()
tcp_server.add_to_loop(loop)
udp_server.add_to_loop(loop)
loop.run()
except (KeyboardInterrupt, IOError, OSError) as e:
logging.error(e)
os._exit(0)
开发者ID:HAMIDx9,项目名称:shadowsocks,代码行数:28,代码来源:local.py
示例16: __init__
def __init__(self):
api_port, num_processes, redis_port, rpi_id, custom_test_pad = \
get_config(['api_port', 'num_processes', 'redis_port', 'rpi_id', 'custom_test_pad'])
self.conf = {
'rpi_id' : rpi_id,
'd_files' : {
'api' : {
'pid' : os.path.join(BASE_DIR, ".monitor", "server.pid.txt"),
'log' : os.path.join(BASE_DIR, ".monitor", "%s.log.txt" % rpi_id),
'ports' : api_port
},
'gpio' : {
'log' : os.path.join(BASE_DIR, ".monitor", "%s.log.txt" % rpi_id),
'pid' : os.path.join(BASE_DIR, ".monitor", "gpio.pid.txt")
},
'ivr' : {
'log' : os.path.join(BASE_DIR, ".monitor", "%s.log.txt" % rpi_id)
},
'module' : {
'log' : os.path.join(BASE_DIR, ".monitor", "%s.log.txt" % rpi_id),
'pid' : os.path.join(BASE_DIR, ".monitor", "%s.pid.txt" % rpi_id)
},
'ap_recorder' : {
'pid' : os.path.join(BASE_DIR, ".monitor", "ap_recorder.pid.txt")
},
'ap_player' : {
'pid' : os.path.join(BASE_DIR, ".monitor", "ap_player.pid.txt")
},
'audio' : {
'pid' : os.path.join(BASE_DIR, ".monitor", "audio.pid.txt"),
'log' : os.path.join(BASE_DIR, ".monitor", "%s.log.txt" % rpi_id)
}
},
'api_port' : api_port,
'num_processes' : num_processes,
'redis_port' : redis_port,
'media_dir' : os.path.join(BASE_DIR, "core", "media"),
'crontab' : os.path.join(BASE_DIR, '.monitor', 'crontab')
}
if custom_test_pad is not None:
self.test_pad_dir = os.path.join(BASE_DIR, custom_test_pad)
print "CUSTOM TEST PAD: %s" % self.test_pad_dir
else:
self.test_pad_dir = os.path.join(BASE_DIR, "core", "test_pad")
self.routes = [
("/", self.TestHandler),
(r'/js/(.*)', tornado.web.StaticFileHandler, \
{ 'path' : os.path.join(self.test_pad_dir, "js")}),
("/status", self.StatusHandler),
("/pick_up", self.PickUpHandler),
("/hang_up", self.HangUpHandler),
(r'/mapping/(\d+)', self.MappingHandler)
]
self.db = redis.StrictRedis(host='localhost', port=self.conf['redis_port'], db=0)
logging.basicConfig(filename=self.conf['d_files']['api']['log'], level=logging.DEBUG)
开发者ID:MetroPictures,项目名称:MPCore,代码行数:60,代码来源:api.py
示例17: _securedrop_crawl
def _securedrop_crawl():
config = get_config()['crawler']
if config.getboolean("use_database"):
fpdb = RawStorage()
class_data = fpdb.get_onions(config["hs_history_lookback"])
else:
fpdb = None
with open(join(_log_dir, config["class_data"]), 'rb') as pj:
class_data = pickle.load(pj)
nonmonitored_name, monitored_name = class_data.keys()
nonmonitored_class, monitored_class = class_data.values()
with Crawler(page_load_timeout=config.getint("page_load_timeout"),
wait_on_page=config.getint("wait_on_page"),
wait_after_closing_circuits=config.getint("wait_after_closing_circuits"),
restart_on_sketchy_exception=config.getboolean("restart_on_sketchy_exception"),
db_handler=fpdb,
torrc_config={"CookieAuthentication": "1",
"EntryNodes": config["entry_nodes"]}) as crawler:
crawler.crawl_monitored_nonmonitored(monitored_class,
nonmonitored_class,
monitored_name=monitored_name,
nonmonitored_name=nonmonitored_name,
ratio=config.getint("monitored_nonmonitored_ratio"))
开发者ID:freedomofpress,项目名称:FingerprintSecureDrop,代码行数:25,代码来源:crawler.py
示例18: _read_conf_file
def _read_conf_file(self):
_config = utils.get_config(settings.CONF_FILE, 'client')
_log_level = logging.INFO
self.migas_version = utils.get_mfc_version()
self.migas_computer_name = utils.get_mfc_computer_name()
if type(_config) is dict:
self.migas_server = _config.get('server', 'migasfree.org')
self.migas_proxy = _config.get('proxy', None)
self.migas_ssl_cert = _config.get('ssl_cert', None)
self.migas_gui_verbose = True # by default
if 'gui_verbose' in _config:
if _config['gui_verbose'] == 'False' \
or _config['gui_verbose'] == '0' \
or _config['gui_verbose'] == 'Off':
self.migas_gui_verbose = False
if 'debug' in _config:
if _config['debug'] == 'True' \
or _config['debug'] == '1' \
or _config['debug'] == 'On':
self._debug = True
_log_level = logging.DEBUG
# http://www.lightbird.net/py-by-example/logging.html
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s',
level=_log_level,
filename=settings.LOG_FILE
)
logging.info('*' * 20)
logging.info('%s in execution', self.CMD)
logging.debug('Config: %s', _config)
开发者ID:jpueyob,项目名称:migasfree-client,代码行数:34,代码来源:client.py
示例19: start_tracker
def start_tracker():
"""Start the Torrent Tracker.
"""
# parse commandline options
parser = OptionParser()
parser.add_option('-p', '--port', help='Tracker Port', default=0)
parser.add_option('-b', '--background', action='store_true', default=False,
help='Start in background')
parser.add_option('-d', '--debug', action='store_true', default=False,
help='Debug mode')
(options, args) = parser.parse_args()
# setup directories
utils.create_pytt_dirs()
# setup logging
utils.setup_logging(options.debug)
try:
# start the torrent tracker
run_app(int(options.port) or utils.get_config().getint('tracker',
'port'))
except KeyboardInterrupt:
logging.info('Tracker Stopped.')
utils.close_db()
sys.exit(0)
except Exception, ex:
logging.fatal('%s' % str(ex))
utils.close_db()
sys.exit(-1)
开发者ID:kholia,项目名称:Pytt,代码行数:29,代码来源:tracker.py
示例20: __init__
def __init__(self):
self.choices = [
"Whole Phamerated Phage",
"Whole Unphamerated Phage",
"One Phamerated Gene",
"One Unphamerated Gene",
"Pham",
]
# self.set_border_width(10)
self.config_info = utils.get_config()
builder = Gtk.Builder()
builder.add_from_file(utils.glade_file)
self.window = builder.get_object("StartWindow")
self.window.set_icon_from_file(utils.icon_file)
choice_box = builder.get_object("choicebox")
choice_list = Gtk.ListStore(str)
for choice in self.choices:
choice_list.append([choice])
choice_box.set_model(choice_list)
renderer = Gtk.CellRendererText()
choice_box.pack_start(renderer, True)
choice_box.add_attribute(renderer, "text", 0)
choice_box.set_active(0)
builder.connect_signals(self)
self.window.show_all()
# self.vbox.pack_start(vbox, True, True, 0)
# self.config_info = {}
# print utils.get_configuration
self.check_blast_type()
开发者ID:anthonytw,项目名称:starterator,代码行数:32,代码来源:uiStart.py
注:本文中的utils.get_config函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论