本文整理汇总了Python中spacewalk.server.rhnSQL.initDB函数的典型用法代码示例。如果您正苦于以下问题:Python initDB函数的具体用法?Python initDB怎么用?Python initDB使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了initDB函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main():
rhnSQL.initDB()
blob_values1 = [
# Regular update
[1, 1, 'value 11', 'value 12', 1],
[2, 1, 'value 21', 'value 22', 2],
# Update with one of the primary keys being None
[3, None, 'value 31', 'value 32', 3],
[4, None, 'value 41', 'value 42', 4],
# Test for writing an empty string into the blob
[5, 5, '', 'value 52', 5],
# Test for writing a shorter string into the blob
[6, 6, 'value 61', 'value 62', 6],
]
newval1_1 = 'new value 11'
newval1_2 = 'new value 12'
newval3_1 = 'new value 31 ' * 1024
newval3_2 = 'new value 32' * 2048
newval5_1 = 'new value 51'
newval5_2 = ''
newval6_1 = 'v61'
newval6_2 = 'v61'
blob_values2 = blob_values1[:]
for r in [0, 2, 4, 5]:
# Copy the old values
blob_values2[r] = blob_values1[r][:]
blob_values2[0][2:5] = [newval1_1, newval1_2, 11]
blob_values2[2][2:5] = [newval3_1, newval3_2, 2]
blob_values2[4][2:5] = [newval5_1, newval5_2, 33]
blob_values2[5][2:5] = [newval6_1, newval6_2, 4]
test_blob_update = Table("test_blob_update",
fields={
'id1': DBint(),
'id2': DBint(),
'val1': DBblob(),
'val2': DBblob(),
'nval': DBint(),
},
# Setting the nullable column to be the first one, to force a specific codepath
pk=['id2', 'id1'],
nullable=['id2'],
)
fields = ['id1', 'id2', 'val1', 'val2', 'nval']
setup(test_blob_update, blob_values1, fields)
print("Insert test")
verify(blob_values1)
t = TableUpdate(test_blob_update, rhnSQL)
rows = [0, 2, 4, 5]
values = _build_update_hash(fields, blob_values2, rows)
t.query(values)
rhnSQL.commit()
print("Updates test")
verify(blob_values2)
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:60,代码来源:test_blob_update.py
示例2: main
def main():
global options_table, debug, verbose
parser = OptionParser(option_list=options_table)
(options, args) = parser.parse_args()
if args:
for arg in args:
sys.stderr.write("Not a valid option ('%s'), try --help\n" % arg)
sys.exit(-1)
if options.verbose:
verbose = 1
if options.debug:
debug = 1
if not options.db:
sys.stderr.write("--db not specified\n")
sys.exit(1)
print "Connecting to %s" % options.db
rhnSQL.initDB(options.db)
if options.update_filer:
process_package_data()
if options.update_sha256:
process_sha256_packages()
if options.update_kstrees:
process_kickstart_trees()
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:32,代码来源:updatePackages.py
示例3: main
def main():
global debug, verbose
parser = OptionParser(option_list=options_table)
(options, args) = parser.parse_args()
if args:
for arg in args:
sys.stderr.write("Not a valid option ('%s'), try --help\n" % arg)
sys.exit(-1)
if options.verbose:
initLOG("stdout", options.verbose or 0)
verbose = 1
if options.debug:
initLOG(CFG.LOG_FILE, options.debug or 0)
debug = 1
rhnSQL.initDB()
if options.update_filer:
process_package_data()
if options.update_sha256:
process_sha256_packages()
if options.update_kstrees:
process_kickstart_trees()
if options.update_package_files:
process_package_files()
if options.update_changelog:
process_changelog()
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:35,代码来源:updatePackages.py
示例4: getParentsChilds
def getParentsChilds(b_only_custom=False):
initCFG('server.satellite')
rhnSQL.initDB()
sql = """
select c1.label, c2.label parent_channel, c1.id
from rhnChannel c1 left outer join rhnChannel c2 on c1.parent_channel = c2.id
order by c2.label desc, c1.label asc
"""
h = rhnSQL.prepare(sql)
h.execute()
d_parents = {}
while 1:
row = h.fetchone_dict()
if not row:
break
if not b_only_custom or rhnChannel.isCustomChannel(row['id']):
parent_channel = row['parent_channel']
if not parent_channel:
d_parents[row['label']] = []
else:
# If the parent is not a custom channel treat the child like
# it's a parent for our purposes
if parent_channel not in d_parents:
d_parents[row['label']] = []
else:
d_parents[parent_channel].append(row['label'])
return d_parents
开发者ID:u-s-p,项目名称:spacewalk,代码行数:30,代码来源:reposync.py
示例5: deactivate
def deactivate():
"""Function to remove certificates and manifest repositories from DB"""
rhnSQL.initDB()
log(0, "Removing certificates...")
Activation._remove_certificates()
log(0, "Removing manifest repositories...")
Activation._remove_repositories()
开发者ID:jdobes,项目名称:spacewalk,代码行数:7,代码来源:activation.py
示例6: getParentsChilds
def getParentsChilds():
initCFG('server')
rhnSQL.initDB()
sql = """
select c1.label, c2.label parent_channel, c1.id
from rhnChannel c1 left outer join rhnChannel c2 on c1.parent_channel = c2.id
order by c2.label desc, c1.label asc
"""
h = rhnSQL.prepare(sql)
h.execute()
d_parents = {}
while 1:
row = h.fetchone_dict()
if not row:
break
if rhnChannel.isCustomChannel(row['id']):
parent_channel = row['parent_channel']
if not parent_channel:
d_parents[row['label']] = []
else:
d_parents[parent_channel].append(row['label'])
return d_parents
开发者ID:cliffy94,项目名称:spacewalk,代码行数:25,代码来源:reposync.py
示例7: __init__
def __init__(self, local_mount_point=None):
rhnSQL.initDB()
self.local_mount_point = local_mount_point
self.repository_tree = CdnRepositoryTree()
self._populate_repository_tree()
f = None
try:
try:
# Channel to repositories mapping
f = open(constants.CONTENT_SOURCE_MAPPING_PATH, 'r')
self.content_source_mapping = json.load(f)
f.close()
# Channel to kickstart repositories mapping
f = open(constants.KICKSTART_SOURCE_MAPPING_PATH, 'r')
self.kickstart_source_mapping = json.load(f)
f.close()
# Kickstart metadata
f = open(constants.KICKSTART_DEFINITIONS_PATH, 'r')
self.kickstart_metadata = json.load(f)
f.close()
except IOError:
e = sys.exc_info()[1]
raise CdnMappingsLoadError("Problem with loading file: %s" % e)
finally:
if f is not None:
f.close()
开发者ID:phurrelmann,项目名称:spacewalk,代码行数:29,代码来源:repository.py
示例8: main
def main():
initCFG("server.app")
rhnSQL.initDB('rhnuser/[email protected]')
channel = { 'label' : 'mibanescu-test2' }
orgid = 1198839
package_template = {
'name' : 'useless',
'version' : '1.0.0',
'arch' : 'noarch',
'org_id' : orgid,
}
batch = []
p = importLib.IncompletePackage()
p.populate(package_template)
p['release'] = '2'
p['channels'] = [channel]
batch.append(p)
p = importLib.IncompletePackage()
p.populate(package_template)
p['release'] = '3'
p['channels'] = [channel]
batch.append(p)
backend = backendOracle.OracleBackend()
cps = packageImport.ChannelPackageSubscription(batch, backend,
caller="misa.testing", strict=1)
cps.run()
print cps.affected_channel_packages
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:32,代码来源:test_strict_channel_package_subscription.py
示例9: getISSCurrentMaster
def getISSCurrentMaster():
rhnSQL.initDB()
master = rhnSQL.fetchone_dict(
"select label from rhnISSMaster where is_current_master = 'Y'")
if not master:
return None
return master['label']
开发者ID:m47ik,项目名称:uyuni,代码行数:7,代码来源:suseLib.py
示例10: __init__
def __init__(self, local_mount_point=None, client_cert_id=None):
rhnSQL.initDB()
self.local_mount_point = local_mount_point
self.repository_tree = CdnRepositoryTree()
self._populate_repository_tree(client_cert_id=client_cert_id)
f = None
try:
try:
# Channel to repositories mapping
f = open(constants.CONTENT_SOURCE_MAPPING_PATH, 'r')
self.content_source_mapping = json.load(f)
f.close()
# Channel to kickstart repositories mapping
f = open(constants.KICKSTART_SOURCE_MAPPING_PATH, 'r')
self.kickstart_source_mapping = json.load(f)
f.close()
# Kickstart metadata
f = open(constants.KICKSTART_DEFINITIONS_PATH, 'r')
self.kickstart_metadata = json.load(f)
f.close()
except IOError:
e = sys.exc_info()[1]
log(1, "Ignoring channel mappings: %s" % e)
self.content_source_mapping = {}
self.kickstart_source_mapping = {}
self.kickstart_metadata = {}
finally:
if f is not None:
f.close()
self.__init_repository_to_channels_mapping()
开发者ID:jdobes,项目名称:spacewalk,代码行数:34,代码来源:repository.py
示例11: main
def main(self):
parser = OptionParser(option_list=options_table)
(self.options, _args) = parser.parse_args()
rhnSQL.initDB()
self._channels_hash = self._get_channels()
package_ids = self._get_packages()
if package_ids is None:
return 1
if self.options.backup_file:
self._backup_packages(package_ids, self.options.backup_file)
try:
self._add_package_header_values(package_ids)
except:
rhnSQL.rollback()
raise
if self.options.commit:
print "Commiting work"
rhnSQL.commit()
else:
print "Rolling back"
rhnSQL.rollback()
开发者ID:TJM,项目名称:spacewalk,代码行数:28,代码来源:satComputePkgHeaders.py
示例12: isAllowedSlave
def isAllowedSlave(hostname):
rhnSQL.initDB()
if not rhnSQL.fetchone_dict("select 1 from rhnISSSlave where slave = :hostname and enabled = 'Y'",
hostname=idn_puny_to_unicode(hostname)):
log_error('Server "%s" is not enabled for ISS.' % hostname)
return False
return True
开发者ID:m47ik,项目名称:uyuni,代码行数:7,代码来源:suseLib.py
示例13: test_failed_connection
def test_failed_connection(self):
# Connect to localhost and look for db on a totally bogus port, this
# makes the test faster.
host = "localhost"
username = "x"
password = "y"
database = "z"
port = 9000
self.assertRaises(
rhnSQL.SQLConnectError,
rhnSQL.initDB,
"oracle",
host,
port,
database,
username,
password
)
try:
rhnSQL.initDB(
backend="oracle",
username=DB_SETTINGS["user"],
password=DB_SETTINGS["password"],
database=DB_SETTINGS["database"]
)
except:
self.fail(
"Exception raised while trying to connect to the db using proper settings. That's not expected to happen.")
开发者ID:flavio,项目名称:spacewalk,代码行数:30,代码来源:test_exceptions.py
示例14: setUp
def setUp(self):
rhnSQL.initDB(
backend="oracle",
username=DB_SETTINGS["user"],
password=DB_SETTINGS["password"],
database=DB_SETTINGS["database"]
)
开发者ID:m47ik,项目名称:uyuni,代码行数:7,代码来源:test_rhnChannel.py
示例15: __init__
def __init__(self):
rhnSQL.initDB()
initCFG('server.satellite')
# Channel families mapping to channels
with open(constants.CHANNEL_FAMILY_MAPPING_PATH, 'r') as f:
self.families = json.load(f)
# Channel metadata
with open(constants.CHANNEL_DEFINITIONS_PATH, 'r') as f:
self.channel_metadata = json.load(f)
# Dist/Release channel mapping
with open(constants.CHANNEL_DIST_MAPPING_PATH, 'r') as f:
self.channel_dist_mapping = json.load(f)
# Channel to repositories mapping
with open(constants.CONTENT_SOURCE_MAPPING_PATH, 'r') as f:
self.content_source_mapping = json.load(f)
# Map channels to their channel family
self.channel_to_family = {}
for family in self.families:
for channel in self.families[family]['channels']:
self.channel_to_family[channel] = family
# Set already synced channels
h = rhnSQL.prepare("""
select label from rhnChannel where org_id is null
""")
h.execute()
channels = h.fetchall_dict() or []
self.synced_channels = [ch['label'] for ch in channels]
开发者ID:jiridostal,项目名称:spacewalk,代码行数:33,代码来源:cdnsync.py
示例16: main
def main():
rhnSQL.initDB()
if not args:
print("No module specified")
return 0
if '.' not in sys.path:
sys.path.append('.')
g = globals()
for module_name in args:
print("Checking module %s" % module_name)
pmn = proper_module_name(module_name)
try:
m = __import__(pmn)
g[module_name] = m
except ImportError:
e = sys.exc_info()[1]
print("Unable to import module %s: %s" % (module_name, e))
continue
comps = pmn.split('.')
for c in comps[1:]:
m = getattr(m, c)
for mod, name, statement in get_class_instances(m, rhnSQL.Statement):
try:
rhnSQL.prepare(statement)
except rhnSQL.SQLStatementPrepareError:
e = sys.exc_info()[1]
print("Error: %s.%s: %s" % (mod.__name__, name, e))
开发者ID:jdobes,项目名称:spacewalk,代码行数:33,代码来源:db-checker.py
示例17: processCommandline
def processCommandline():
options = [
Option('--ca-cert', action='store', default=DEFAULT_TRUSTED_CERT, type="string", help='public CA certificate, default is %s' % DEFAULT_TRUSTED_CERT),
Option('--label', action='store', default='RHN-ORG-TRUSTED-SSL-CERT', type="string", help='FOR TESTING ONLY - alternative database label for this CA certificate, default is "RHN-ORG-TRUSTED-SSL-CERT"'),
Option('-v','--verbose', action='count', help='be verbose (accumulable: -vvv means "be *really* verbose").'),
]
values, args = OptionParser(option_list=options).parse_args()
# we take no extra commandline arguments that are not linked to an option
if args:
msg = ("ERROR: these arguments make no sense in this context (try "
"--help): %s\n" % repr(args))
raise ValueError(msg)
if not os.path.exists(values.ca_cert):
sys.stderr.write("ERROR: can't find CA certificate at this location: "
"%s\n" % values.ca_cert)
sys.exit(10)
try:
rhnSQL.initDB()
except:
sys.stderr.write("""\
ERROR: there was a problem trying to initialize the database:
%s\n""" % rhnTB.fetchTraceback())
sys.exit(11)
if values.verbose:
print 'Public CA SSL certificate: %s' % values.ca_cert
return values
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:34,代码来源:rhn_ssl_dbstore.py
示例18: main
def main():
rhnSQL.initDB()
channel = {"label": "mibanescu-test2"}
orgid = 1198839
package_template = {"name": "useless", "version": "1.0.0", "arch": "noarch", "org_id": orgid}
batch = []
p = importLib.IncompletePackage()
p.populate(package_template)
p["release"] = "2"
p["channels"] = [channel]
batch.append(p)
p = importLib.IncompletePackage()
p.populate(package_template)
p["release"] = "3"
p["channels"] = [channel]
batch.append(p)
backend = backendOracle.OracleBackend()
cps = packageImport.ChannelPackageSubscription(batch, backend, caller="misa.testing", strict=1)
cps.run()
print cps.affected_channel_packages
开发者ID:kidaa30,项目名称:spacewalk,代码行数:25,代码来源:test_strict_channel_package_subscription.py
示例19: setup_config
def setup_config(self, config, force=0):
# Figure out the log level
debug_level = self.options.verbose
if debug_level is None:
debug_level = CFG.debug
self.debug_level = debug_level
logfile = self.options.logfile
if logfile is None or logfile == '':
logfile = CFG.log_file
initLOG(level=debug_level, log_file=logfile)
# Get the ssl cert
ssl_cert = CFG.osa_ssl_cert
try:
self.check_cert(ssl_cert)
except jabber_lib.InvalidCertError:
e = sys.exc_info()[1]
log_error("Invalid SSL certificate:", e)
return 1
self.ssl_cert = ssl_cert
rhnSQL.initDB()
self._username = 'rhn-dispatcher-sat'
self._password = self.get_dispatcher_password(self._username)
if not self._password:
self._password = self.create_dispatcher_password(32)
self._resource = 'superclient'
js = config.get('jabber_server')
self._jabber_servers = [ idn_ascii_to_puny(js) ]
开发者ID:m47ik,项目名称:uyuni,代码行数:31,代码来源:osa_dispatcher.py
示例20: main
def main():
if len(sys.argv) == 1:
server_name = 'xmlrpc.rhn.webdev.redhat.com'
else:
server_name = sys.argv[1]
if len(sys.argv) <= 2:
db_name = 'rhnuser/[email protected]'
else:
db_name = sys.argv[2]
rhnSQL.initDB(db_name)
uri = "http://%s/XMLRPC" % (server_name, )
s = rpclib.Server(uri)
username = password = "test-username-%.3f" % time.time()
email = "misa+%[email protected]" % username
s.registration.reserve_user(username, password)
s.registration.new_user(username, password, email)
data = {
'os_release' : '9',
'architecture' : 'athlon-redhat-linux',
'profile_name' : 'Test profile for %s' % username,
'username' : username,
'password' : password,
}
systemid = s.registration.new_system(data)
str_caps = [
'this.is.bogus1(0)=0',
'this.is.bogus2(1)=1',
'this.is.bogus3(2)=2',
]
for cap in str_caps:
s.add_header('X-RHN-Client-Capability', cap)
# Add some packages
packages = [
['a', '1', '1', ''],
['b', '2', '2', ''],
['c', '3', '3', ''],
]
s.registration.update_packages(systemid, packages)
sobj = rhnServer.get(systemid)
server_id = sobj.getid()
print "Registered server", server_id
caps = rhnCapability.get_db_client_capabilities(server_id)
caps_len = len(caps.keys())
expected_len = len(str_caps)
if caps_len != expected_len:
print "FAILED: return %d caps, expected %d" % (caps_len, expected_len)
return 1
print "Succeeded", caps
return 0
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:59,代码来源:test_capabilities.py
注:本文中的spacewalk.server.rhnSQL.initDB函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论