本文整理汇总了Python中spacewalk.server.rhnSQL.read_lob函数的典型用法代码示例。如果您正苦于以下问题:Python read_lob函数的具体用法?Python read_lob怎么用?Python read_lob使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了read_lob函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: verify
def verify(blob_values):
q = """
select val1, val2 from test_blob_update where id1 = :id1 and %s
"""
for v in blob_values:
i1 = v[0]
i2 = v[1]
v1 = v[2]
v2 = v[3]
hval = {'id1': i1}
if i2 is None:
s = "id2 is null"
else:
s = "id2 = :id2"
hval['id2'] = i2
h = rhnSQL.prepare(q % s)
h.execute(**hval)
row = h.fetchone_dict()
val1 = row['val1']
val2 = row['val2']
val1_val = rhnSQL.read_lob(val1)
val2_val = rhnSQL.read_lob(val2)
assert v1 == val1_val, "Not equal: %s, %s" % (repr(v1), repr(val1_val))
assert v2 == val2_val, "Not equal: %s, %s" % (repr(v2), repr(val2_val))
print("Verification passes")
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:26,代码来源:test_blob_update.py
示例2: test_lobs
def test_lobs(self):
new_id = rhnSQL.Sequence('misatestlob_id_seq').next()
h = rhnSQL.prepare("""
insert into misatestlob (id, val) values (:id, empty_blob())
""")
h.execute(id=new_id)
h = rhnSQL.prepare("""
select val from misatestlob where id = :id for update of val
""")
h.execute(id=new_id)
row = h.fetchone_dict()
self.assertNotEqual(row, None)
lob = row['val']
s = ""
for i in range(256):
s = s + chr(i)
lob.write(s)
rhnSQL.commit()
h = rhnSQL.prepare("""
select val from misatestlob where id = :id
""")
h.execute(id=new_id)
row = h.fetchone_dict()
self.assertNotEqual(row, None)
lob = row['val']
data = rhnSQL.read_lob(lob)
self.assertEqual(data, s)
开发者ID:TJM,项目名称:spacewalk,代码行数:29,代码来源:test_lob.py
示例3: _get_file_revision
def _get_file_revision(self, config_channel, revision, path):
if revision and not revision.isdigit():
raise rhnFault(4016, "Invalid revision number '%s' specified for path %s "
"in channel %s" % (revision, path, config_channel),
explain=0)
f = self._get_file(config_channel, path, revision=revision)
if not f:
raise rhnFault(4011, "File %s (revision %s) does not exist "
"in channel %s" % (path, revision, config_channel),
explain=0)
if f['label'] == 'file' and f['is_binary'] == 'Y':
raise rhnFault(4004, "File %s (revision %s) seems to contain "
"binary data" % (path, revision),
explain=0)
# We have to read the contents of the first file here, because the LOB
# object is tied to a cursor; if we re-execute the cursor, the LOB
# seems to be invalid (bug 151220)
# Empty files or directories may have NULL instead of lobs
fc_lob = f.get('file_contents')
if fc_lob:
f['file_content'] = rhnSQL.read_lob(fc_lob).splitlines()
else:
f['file_content'] = ''
return f
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:27,代码来源:rhn_config_management.py
示例4: _checkCertMatch_rhnCryptoKey
def _checkCertMatch_rhnCryptoKey(cert, description, org_id, deleteRowYN=0,
verbosity=0):
""" is there an CA SSL certificate already in the database?
If yes:
return ID:
-1, then no cert in DB
None if they are identical (i.e., nothing to do)
0...N if cert is in database
if found, optionally deletes the row and returns -1
Used ONLY by: store_rhnCryptoKey(...)
"""
row = lookup_cert(description, org_id)
rhn_cryptokey_id = -1
if row:
if cert == rhnSQL.read_lob(row['key']):
# match found, nothing to do
if verbosity:
print("Nothing to do: certificate to be pushed matches certificate in database.")
return
# there can only be one (bugzilla: 120297)
rhn_cryptokey_id = int(row['id'])
# print 'found existing certificate - id:', rhn_cryptokey_id
# NUKE IT!
if deleteRowYN:
# print 'found a cert, nuking it! id:', rhn_cryptokey_id
h = rhnSQL.prepare('delete from rhnCryptoKey where id=:rhn_cryptokey_id')
h.execute(rhn_cryptokey_id=rhn_cryptokey_id)
# rhnSQL.commit()
rhn_cryptokey_id = -1
return rhn_cryptokey_id
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:32,代码来源:satCerts.py
示例5: _checkCertMatch_rhnCryptoKey
def _checkCertMatch_rhnCryptoKey(caCert, description, org_id, deleteRowYN=0, verbosity=0):
""" is there an CA SSL certificate already in the database?
If yes:
return ID:
-1, then no cert in DB
None if they are identical (i.e., nothing to do)
0...N if cert is in database
if found, optionally deletes the row and returns -1
Used ONLY by: store_rhnCryptoKey(...)
"""
cert = open(caCert, "rb").read().strip()
h = rhnSQL.prepare(_querySelectCryptoCertInfo)
h.execute(description=description, org_id=org_id)
row = h.fetchone_dict()
rhn_cryptokey_id = -1
if row:
if cert == rhnSQL.read_lob(row["key"]):
# match found, nothing to do
if verbosity:
print "Nothing to do: certificate to be pushed matches certificate in database."
return
# there can only be one (bugzilla: 120297)
rhn_cryptokey_id = int(row["id"])
# print 'found existing certificate - id:', rhn_cryptokey_id
## NUKE IT!
if deleteRowYN:
# print 'found a cert, nuking it! id:', rhn_cryptokey_id
h = rhnSQL.prepare("delete from rhnCryptoKey where id=:rhn_cryptokey_id")
h.execute(rhn_cryptokey_id=rhn_cryptokey_id)
# rhnSQL.commit()
rhn_cryptokey_id = -1
return rhn_cryptokey_id
开发者ID:jbzhang99,项目名称:spacewalk,代码行数:35,代码来源:satCerts.py
示例6: retrieve_db_cert
def retrieve_db_cert(label='rhn-satellite-cert'):
h = rhnSQL.prepare(_query_latest_version)
h.execute(label=label)
row = h.fetchone_dict()
if not row:
return None
row['cert'] = rhnSQL.read_lob(row['cert'])
return row
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:8,代码来源:satCerts.py
示例7: _push_contents
def _push_contents(self, file):
checksum_type = 'sha256' # FIXME: this should be configuration option
file['file_size'] = 0
file['is_binary'] = 'N'
file_path = file.get('path')
file_contents = file.get('file_contents') or ''
if 'enc64' in file and file_contents:
file_contents = base64.decodestring(file_contents)
if 'config_file_type_id' not in file:
log_debug(4, "Client does not support config directories, so set file_type_id to 1")
file['config_file_type_id'] = '1'
file['checksum_type'] = checksum_type
file['checksum'] = getStringChecksum(checksum_type, file_contents or '')
if file_contents:
file['file_size'] = len(file_contents)
if file['file_size'] > self._get_maximum_file_size():
raise ConfigFileTooLargeError(file_path, file['file_size'])
# Is the content binary data?
# XXX We may need a heuristic; this is what the web site does, and we
# have to be consistent
# XXX Yes this is iterating over a string
try:
file_contents.decode('UTF-8')
except UnicodeDecodeError:
file['is_binary'] = 'Y'
h = rhnSQL.prepare(self._query_content_lookup)
h.execute(**file)
row = h.fetchone_dict()
if row:
db_contents = rhnSQL.read_lob(row['contents']) or ''
if file_contents == db_contents:
# Same content
file['config_content_id'] = row['id']
log_debug(5, "same content")
return
# We have to insert a new file now
content_seq = rhnSQL.Sequence('rhn_confcontent_id_seq')
config_content_id = content_seq.next()
file['config_content_id'] = config_content_id
file['contents'] = file_contents
h = rhnSQL.prepare(self._query_insert_content,
blob_map={'contents': 'contents'})
h.execute(**file)
开发者ID:TJM,项目名称:spacewalk,代码行数:56,代码来源:configFilesHandler.py
示例8: get
def get(name, modified = None, raw = None, compressed = None):
# Check to see if the entry is in the database, with the right version
h = _fetch_cursor(key=name, modified=modified)
row = h.fetchone_dict()
if not row:
# Key not found
return None
if modified and row['delta'] != 0:
# Different version
log_debug(4, "database cache: different version")
return None
if modified is None:
# The caller doesn't care about the modified time, but we do, since we
# want to fetch the same version from the disk cache
modified = row['modified']
if rhnCache.has_key(name, modified):
# We have the value
log_debug(4, "Filesystem cache HIT")
return rhnCache.get(name, modified=modified, raw=raw)
log_debug(4, "Filesystem cache MISS")
# The disk cache doesn't have this key at all, or it's a modified value
# Fetch the value from the database
v = row['value']
# Update the accessed field
rhnSQL.Procedure("rhn_cache_update_accessed")(name)
if compressed:
io = cStringIO.StringIO()
io.write(rhnSQL.read_lob(v))
io.seek(0, 0)
# XXX For about 40M of compressed data sometimes we get:
# zlib.error: Error -3 while decompressing: incomplete dynamic bit lengths tree
v = gzip.GzipFile(None, "r", 0, io)
try:
data = v.read()
except (ValueError, IOError, gzip.zlib.error), e:
# XXX poking at gzip.zlib may not be that well-advised
log_error("rhnDatabaseCache: gzip error for key %s: %s" % (
name, e))
# Ignore this entry in the database cache, it has invalid data
return None
开发者ID:m47ik,项目名称:uyuni,代码行数:52,代码来源:rhnDatabaseCache.py
示例9: format_file_results
def format_file_results(row, server=None):
encoding = ''
contents = None
contents = rhnSQL.read_lob(row['file_contents']) or ''
checksum = row['checksum'] or ''
if server and (row['is_binary'] == 'N') and contents:
interpolator = ServerTemplatedDocument(server,
start_delim=row['delim_start'],
end_delim=row['delim_end'])
contents = interpolator.interpolate(contents)
if row['checksum_type']:
checksummer = hashlib.new(row['checksum_type'])
checksummer.update(contents)
checksum = checksummer.hexdigest()
if contents:
client_caps = rhnCapability.get_client_capabilities()
if client_caps and 'configfiles.base64_enc' in client_caps:
encoding = 'base64'
contents = base64.encodestring(contents)
if row.get('modified', False):
m_date = xmlrpclib.DateTime(str(row['modified']))
else:
m_date = ''
return {
'path': row['path'],
'config_channel': row['config_channel'],
'file_contents': contents,
'symlink': row['symlink'] or '',
'checksum_type': row['checksum_type'] or '',
'checksum': checksum,
'verify_contents': True,
'delim_start': row['delim_start'] or '',
'delim_end': row['delim_end'] or '',
'revision': row['revision'] or '',
'username': row['username'] or '',
'groupname': row['groupname'] or '',
'filemode': row['filemode'] or '',
'encoding': encoding or '',
'filetype': row['label'],
'selinux_ctx': row['selinux_ctx'] or '',
'modified': m_date,
'is_binary': row['is_binary'] or '',
}
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:47,代码来源:configFilesHandler.py
示例10: xccdf_eval
def xccdf_eval(server_id, action_id, dry_run=0):
log_debug(3)
statement = """
select path, parameters
from rhnActionScap
where action_id = :action_id"""
h = rhnSQL.prepare(statement)
h.execute(action_id=action_id)
d = h.fetchone_dict()
if not d:
raise InvalidAction("scap.xccdf_eval: Unknown action id "
"%s for server %s" % (action_id, server_id))
return ({
'path': d['path'],
'id': action_id,
'file_size': _scap_file_limit(server_id),
'params': rhnSQL.read_lob(d['parameters']) or ''
},)
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:18,代码来源:scap.py
示例11: run
def run(server_id, action_id, dry_run=0):
log_debug(3, dry_run)
data = {}
h = rhnSQL.prepare(_query_action_script)
h.execute(action_id=action_id)
info = h.fetchone_dict() or []
if info:
data["username"] = info["username"]
data["groupname"] = info["groupname"]
data["timeout"] = info["timeout"] or ""
data["script"] = rhnSQL.read_lob(info["script"]) or ""
# used to make the resulting times make some sense in the db
data["now"] = info["now"]
return action_id, data
开发者ID:pombredanne,项目名称:spacewalk-2,代码行数:19,代码来源:script.py
示例12: run
def run(server_id, action_id, dry_run=0):
log_debug(3, dry_run)
data = {}
h = rhnSQL.prepare(_query_action_script)
h.execute(action_id=action_id)
info = h.fetchone_dict() or []
if info:
data['username'] = info['username']
data['groupname'] = info['groupname']
data['timeout'] = info['timeout'] or ''
data['script'] = rhnSQL.read_lob(info['script']) or ''
# used to make the resulting times make some sense in the db
data['now'] = info['now']
return action_id, data
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:19,代码来源:script.py
示例13: format_file_results
def format_file_results(row, server=None):
encoding = ''
contents = None
contents = rhnSQL.read_lob(row['file_contents']) or ''
if server and (row['is_binary'] == 'N') and contents:
interpolator = ServerTemplatedDocument(server,
start_delim=row['delim_start'],
end_delim=row['delim_end'])
contents = interpolator.interpolate(contents)
if contents:
client_caps = rhnCapability.get_client_capabilities()
if client_caps and client_caps.has_key('configfiles.base64_enc'):
encoding = 'base64'
contents = base64.encodestring(contents)
return {
'path' : row['path'],
'config_channel': row['config_channel'],
'file_contents' : contents,
'symlink' : row['symlink'] or '',
'checksum_type' : row['checksum_type'] or '',
'checksum' : row['checksum'] or '',
'delim_start' : row['delim_start'] or '',
'delim_end' : row['delim_end'] or '',
'revision' : row['revision'] or '',
'username' : row['username'] or '',
'groupname' : row['groupname'] or '',
'filemode' : row['filemode'] or '',
'encoding' : encoding or '',
'filetype' : row['label'],
'selinux_ctx' : row['selinux_ctx'] or '',
}
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:36,代码来源:configFilesHandler.py
示例14: _push_contents
def _push_contents(self, file):
checksum_type = 'md5' # FIXME: this should be configuration option
file['file_size'] = 0
file['is_binary'] = 'N'
file_path = file.get('path')
file_contents = file.get('file_contents') or ''
if file.has_key('enc64') and file_contents:
file_contents = base64.decodestring(file_contents)
if not file.has_key('config_file_type_id'):
log_debug(4, "Client does not support config directories, so set file_type_id to 1")
file['config_file_type_id'] = '1'
file['checksum_type'] = checksum_type
file['checksum'] = getStringChecksum(checksum_type, file_contents or '')
if file_contents:
file['file_size'] = len(file_contents)
if file['file_size'] > self._get_maximum_file_size():
raise ConfigFileTooLargeError(file_path, file['file_size'])
# Is the content binary data?
# XXX We may need a heuristic; this is what the web site does, and we
# have to be consistent
# XXX Yes this is iterating over a string
for c in file_contents:
if ord(c) > 127:
file['is_binary'] = 'Y'
break
h = rhnSQL.prepare(self._query_content_lookup)
apply(h.execute, (), file)
row = h.fetchone_dict()
if row:
db_contents = rhnSQL.read_lob(row['contents']) or ''
if file_contents == db_contents:
# Same content
file['config_content_id'] = row['id']
log_debug(5, "same content")
return
# We have to insert a new file now
content_seq = rhnSQL.Sequence('rhn_confcontent_id_seq')
config_content_id = content_seq.next()
file['config_content_id'] = config_content_id
if file_contents:
h = rhnSQL.prepare(self._query_insert_content)
else:
h = rhnSQL.prepare(self._query_insert_null_content)
apply(h.execute, (), file)
# Row should be there now
h = rhnSQL.prepare(self._query_get_content_row)
apply(h.execute, (), file)
row = h.fetchone_dict()
if not row:
# Ouch
raise rhnException("Row should have been inserted but it's not")
if file_contents:
log_debug(5, "writing file contents to blob")
lob = row['contents']
lob.write(file_contents)
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:74,代码来源:configFilesHandler.py
注:本文中的spacewalk.server.rhnSQL.read_lob函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论