• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python rhnSQL.initDB函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中spacewalk.server.rhnSQL.initDB函数的典型用法代码示例。如果您正苦于以下问题:Python initDB函数的具体用法?Python initDB怎么用?Python initDB使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了initDB函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

def main():
    rhnSQL.initDB()

    blob_values1 = [
        # Regular update
        [1, 1,     'value 11', 'value 12', 1],
        [2, 1,     'value 21', 'value 22', 2],
        # Update with one of the primary keys being None
        [3, None,  'value 31', 'value 32', 3],
        [4, None,  'value 41', 'value 42', 4],
        # Test for writing an empty string into the blob
        [5, 5,     '',         'value 52', 5],
        # Test for writing a shorter string into the blob
        [6, 6,     'value 61', 'value 62', 6],
    ]
    newval1_1 = 'new value 11'
    newval1_2 = 'new value 12'
    newval3_1 = 'new value 31 ' * 1024
    newval3_2 = 'new value 32' * 2048
    newval5_1 = 'new value 51'
    newval5_2 = ''
    newval6_1 = 'v61'
    newval6_2 = 'v61'
    blob_values2 = blob_values1[:]
    for r in [0, 2, 4, 5]:
        # Copy the old values
        blob_values2[r] = blob_values1[r][:]
    blob_values2[0][2:5] = [newval1_1, newval1_2, 11]
    blob_values2[2][2:5] = [newval3_1, newval3_2, 2]
    blob_values2[4][2:5] = [newval5_1, newval5_2, 33]
    blob_values2[5][2:5] = [newval6_1, newval6_2, 4]

    test_blob_update = Table("test_blob_update",
                             fields={
                                 'id1': DBint(),
                                 'id2': DBint(),
                                 'val1': DBblob(),
                                 'val2': DBblob(),
                                 'nval': DBint(),
                             },
                             # Setting the nullable column to be the first one, to force a specific codepath
                             pk=['id2', 'id1'],
                             nullable=['id2'],
                             )

    fields = ['id1', 'id2', 'val1', 'val2', 'nval']
    setup(test_blob_update, blob_values1, fields)
    print("Insert test")
    verify(blob_values1)

    t = TableUpdate(test_blob_update, rhnSQL)

    rows = [0, 2, 4, 5]
    values = _build_update_hash(fields, blob_values2, rows)

    t.query(values)
    rhnSQL.commit()

    print("Updates test")
    verify(blob_values2)
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:60,代码来源:test_blob_update.py


示例2: main

def main():
    global options_table, debug, verbose
    parser = OptionParser(option_list=options_table)

    (options, args) = parser.parse_args()

    if args:
        for arg in args:
            sys.stderr.write("Not a valid option ('%s'), try --help\n" % arg)
        sys.exit(-1)

    if options.verbose:
        verbose = 1

    if options.debug:
        debug = 1

    if not options.db:
        sys.stderr.write("--db not specified\n")
        sys.exit(1)

    print "Connecting to %s" % options.db
    rhnSQL.initDB(options.db)

    if options.update_filer:
        process_package_data()

    if options.update_sha256:
        process_sha256_packages()

    if options.update_kstrees:
        process_kickstart_trees()
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:32,代码来源:updatePackages.py


示例3: main

def main():
    global debug, verbose
    parser = OptionParser(option_list=options_table)

    (options, args) = parser.parse_args()

    if args:
        for arg in args:
            sys.stderr.write("Not a valid option ('%s'), try --help\n" % arg)
        sys.exit(-1)

    if options.verbose:
        initLOG("stdout", options.verbose or 0)
        verbose = 1

    if options.debug:
        initLOG(CFG.LOG_FILE, options.debug or 0)
        debug = 1

    rhnSQL.initDB()

    if options.update_filer:
        process_package_data()

    if options.update_sha256:
        process_sha256_packages()

    if options.update_kstrees:
        process_kickstart_trees()

    if options.update_package_files:
        process_package_files()

    if options.update_changelog:
        process_changelog()
开发者ID:BlackSmith,项目名称:spacewalk,代码行数:35,代码来源:updatePackages.py


示例4: getParentsChilds

def getParentsChilds(b_only_custom=False):

    initCFG('server.satellite')
    rhnSQL.initDB()

    sql = """
        select c1.label, c2.label parent_channel, c1.id
        from rhnChannel c1 left outer join rhnChannel c2 on c1.parent_channel = c2.id
        order by c2.label desc, c1.label asc
    """
    h = rhnSQL.prepare(sql)
    h.execute()
    d_parents = {}
    while 1:
        row = h.fetchone_dict()
        if not row:
            break
        if not b_only_custom or rhnChannel.isCustomChannel(row['id']):
            parent_channel = row['parent_channel']
            if not parent_channel:
                d_parents[row['label']] = []
            else:
                # If the parent is not a custom channel treat the child like
                # it's a parent for our purposes
                if parent_channel not in d_parents:
                    d_parents[row['label']] = []
                else:
                    d_parents[parent_channel].append(row['label'])

    return d_parents
开发者ID:u-s-p,项目名称:spacewalk,代码行数:30,代码来源:reposync.py


示例5: deactivate

 def deactivate():
     """Function to remove certificates and manifest repositories from DB"""
     rhnSQL.initDB()
     log(0, "Removing certificates...")
     Activation._remove_certificates()
     log(0, "Removing manifest repositories...")
     Activation._remove_repositories()
开发者ID:jdobes,项目名称:spacewalk,代码行数:7,代码来源:activation.py


示例6: getParentsChilds

def getParentsChilds():

    initCFG('server')
    rhnSQL.initDB()

    sql = """
        select c1.label, c2.label parent_channel, c1.id
        from rhnChannel c1 left outer join rhnChannel c2 on c1.parent_channel = c2.id
        order by c2.label desc, c1.label asc
    """
    h = rhnSQL.prepare(sql)
    h.execute()
    d_parents = {}
    while 1:
        row = h.fetchone_dict()
        if not row:
            break
        if rhnChannel.isCustomChannel(row['id']):
            parent_channel = row['parent_channel']
            if not parent_channel:
                d_parents[row['label']] = []
            else:
                d_parents[parent_channel].append(row['label'])

    return d_parents
开发者ID:cliffy94,项目名称:spacewalk,代码行数:25,代码来源:reposync.py


示例7: __init__

    def __init__(self, local_mount_point=None):
        rhnSQL.initDB()
        self.local_mount_point = local_mount_point
        self.repository_tree = CdnRepositoryTree()
        self._populate_repository_tree()

        f = None
        try:
            try:
                # Channel to repositories mapping
                f = open(constants.CONTENT_SOURCE_MAPPING_PATH, 'r')
                self.content_source_mapping = json.load(f)
                f.close()

                # Channel to kickstart repositories mapping
                f = open(constants.KICKSTART_SOURCE_MAPPING_PATH, 'r')
                self.kickstart_source_mapping = json.load(f)
                f.close()

                # Kickstart metadata
                f = open(constants.KICKSTART_DEFINITIONS_PATH, 'r')
                self.kickstart_metadata = json.load(f)
                f.close()
            except IOError:
                e = sys.exc_info()[1]
                raise CdnMappingsLoadError("Problem with loading file: %s" % e)
        finally:
            if f is not None:
                f.close()
开发者ID:phurrelmann,项目名称:spacewalk,代码行数:29,代码来源:repository.py


示例8: main

def main():
    initCFG("server.app")
    rhnSQL.initDB('rhnuser/[email protected]')

    channel = { 'label' : 'mibanescu-test2' }

    orgid = 1198839
    package_template = {
        'name'      : 'useless',
        'version'   : '1.0.0',
        'arch'      : 'noarch',
        'org_id'    : orgid,
    }
        
    batch = []
    p = importLib.IncompletePackage()
    p.populate(package_template)
    p['release'] = '2'
    p['channels'] = [channel]
    batch.append(p)
    
    p = importLib.IncompletePackage()
    p.populate(package_template)
    p['release'] = '3'
    p['channels'] = [channel]
    batch.append(p)

    backend = backendOracle.OracleBackend()
    cps = packageImport.ChannelPackageSubscription(batch, backend,
        caller="misa.testing", strict=1)
    cps.run()
    print cps.affected_channel_packages
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:32,代码来源:test_strict_channel_package_subscription.py


示例9: getISSCurrentMaster

def getISSCurrentMaster():
    rhnSQL.initDB()
    master = rhnSQL.fetchone_dict(
        "select label from rhnISSMaster where is_current_master = 'Y'")
    if not master:
        return None
    return master['label']
开发者ID:m47ik,项目名称:uyuni,代码行数:7,代码来源:suseLib.py


示例10: __init__

    def __init__(self, local_mount_point=None, client_cert_id=None):
        rhnSQL.initDB()
        self.local_mount_point = local_mount_point
        self.repository_tree = CdnRepositoryTree()
        self._populate_repository_tree(client_cert_id=client_cert_id)

        f = None
        try:
            try:
                # Channel to repositories mapping
                f = open(constants.CONTENT_SOURCE_MAPPING_PATH, 'r')
                self.content_source_mapping = json.load(f)
                f.close()

                # Channel to kickstart repositories mapping
                f = open(constants.KICKSTART_SOURCE_MAPPING_PATH, 'r')
                self.kickstart_source_mapping = json.load(f)
                f.close()

                # Kickstart metadata
                f = open(constants.KICKSTART_DEFINITIONS_PATH, 'r')
                self.kickstart_metadata = json.load(f)
                f.close()
            except IOError:
                e = sys.exc_info()[1]
                log(1, "Ignoring channel mappings: %s" % e)
                self.content_source_mapping = {}
                self.kickstart_source_mapping = {}
                self.kickstart_metadata = {}
        finally:
            if f is not None:
                f.close()

        self.__init_repository_to_channels_mapping()
开发者ID:jdobes,项目名称:spacewalk,代码行数:34,代码来源:repository.py


示例11: main

    def main(self):
        parser = OptionParser(option_list=options_table)

        (self.options, _args) = parser.parse_args()

        rhnSQL.initDB()

        self._channels_hash = self._get_channels()

        package_ids = self._get_packages()
        if package_ids is None:
            return 1

        if self.options.backup_file:
            self._backup_packages(package_ids, self.options.backup_file)

        try:
            self._add_package_header_values(package_ids)
        except:
            rhnSQL.rollback()
            raise

        if self.options.commit:
            print "Commiting work"
            rhnSQL.commit()
        else:
            print "Rolling back"
            rhnSQL.rollback()
开发者ID:TJM,项目名称:spacewalk,代码行数:28,代码来源:satComputePkgHeaders.py


示例12: isAllowedSlave

def isAllowedSlave(hostname):
    rhnSQL.initDB()
    if not rhnSQL.fetchone_dict("select 1 from rhnISSSlave where slave = :hostname and enabled = 'Y'",
                                hostname=idn_puny_to_unicode(hostname)):
        log_error('Server "%s" is not enabled for ISS.' % hostname)
        return False
    return True
开发者ID:m47ik,项目名称:uyuni,代码行数:7,代码来源:suseLib.py


示例13: test_failed_connection

    def test_failed_connection(self):
        # Connect to localhost and look for db on a totally bogus port, this
        # makes the test faster.
        host = "localhost"
        username = "x"
        password = "y"
        database = "z"
        port = 9000

        self.assertRaises(
            rhnSQL.SQLConnectError,
            rhnSQL.initDB,
            "oracle",
            host,
            port,
            database,
            username,
            password
        )

        try:
            rhnSQL.initDB(
                backend="oracle",
                username=DB_SETTINGS["user"],
                password=DB_SETTINGS["password"],
                database=DB_SETTINGS["database"]
            )
        except:
            self.fail(
                "Exception raised while trying to connect to the db using proper settings. That's not expected to happen.")
开发者ID:flavio,项目名称:spacewalk,代码行数:30,代码来源:test_exceptions.py


示例14: setUp

 def setUp(self):
     rhnSQL.initDB(
         backend="oracle",
         username=DB_SETTINGS["user"],
         password=DB_SETTINGS["password"],
         database=DB_SETTINGS["database"]
     )
开发者ID:m47ik,项目名称:uyuni,代码行数:7,代码来源:test_rhnChannel.py


示例15: __init__

    def __init__(self):
        rhnSQL.initDB()
        initCFG('server.satellite')

        # Channel families mapping to channels
        with open(constants.CHANNEL_FAMILY_MAPPING_PATH, 'r') as f:
            self.families = json.load(f)

        # Channel metadata
        with open(constants.CHANNEL_DEFINITIONS_PATH, 'r') as f:
            self.channel_metadata = json.load(f)

        # Dist/Release channel mapping
        with open(constants.CHANNEL_DIST_MAPPING_PATH, 'r') as f:
            self.channel_dist_mapping = json.load(f)

        # Channel to repositories mapping
        with open(constants.CONTENT_SOURCE_MAPPING_PATH, 'r') as f:
            self.content_source_mapping = json.load(f)

        # Map channels to their channel family
        self.channel_to_family = {}
        for family in self.families:
            for channel in self.families[family]['channels']:
                self.channel_to_family[channel] = family

        # Set already synced channels
        h = rhnSQL.prepare("""
            select label from rhnChannel where org_id is null
        """)
        h.execute()
        channels = h.fetchall_dict() or []
        self.synced_channels = [ch['label'] for ch in channels]
开发者ID:jiridostal,项目名称:spacewalk,代码行数:33,代码来源:cdnsync.py


示例16: main

def main():
    rhnSQL.initDB()

    if not args:
        print("No module specified")
        return 0

    if '.' not in sys.path:
        sys.path.append('.')

    g = globals()

    for module_name in args:
        print("Checking module %s" % module_name)
        pmn = proper_module_name(module_name)
        try:
            m = __import__(pmn)
            g[module_name] = m
        except ImportError:
            e = sys.exc_info()[1]
            print("Unable to import module %s: %s" % (module_name, e))
            continue

        comps = pmn.split('.')
        for c in comps[1:]:
            m = getattr(m, c)

        for mod, name, statement in get_class_instances(m, rhnSQL.Statement):
            try:
                rhnSQL.prepare(statement)
            except rhnSQL.SQLStatementPrepareError:
                e = sys.exc_info()[1]
                print("Error: %s.%s: %s" % (mod.__name__, name, e))
开发者ID:jdobes,项目名称:spacewalk,代码行数:33,代码来源:db-checker.py


示例17: processCommandline

def processCommandline():

    options = [
        Option('--ca-cert',      action='store', default=DEFAULT_TRUSTED_CERT, type="string", help='public CA certificate, default is %s' % DEFAULT_TRUSTED_CERT),
        Option('--label',        action='store', default='RHN-ORG-TRUSTED-SSL-CERT', type="string", help='FOR TESTING ONLY - alternative database label for this CA certificate, default is "RHN-ORG-TRUSTED-SSL-CERT"'),
        Option('-v','--verbose', action='count', help='be verbose (accumulable: -vvv means "be *really* verbose").'),
              ]

    values, args = OptionParser(option_list=options).parse_args()

    # we take no extra commandline arguments that are not linked to an option
    if args:
        msg = ("ERROR: these arguments make no sense in this context (try "
               "--help): %s\n" % repr(args))
        raise ValueError(msg)

    if not os.path.exists(values.ca_cert):
        sys.stderr.write("ERROR: can't find CA certificate at this location: "
                         "%s\n" % values.ca_cert)
        sys.exit(10)

    try:
        rhnSQL.initDB()
    except:
        sys.stderr.write("""\
ERROR: there was a problem trying to initialize the database:

%s\n""" % rhnTB.fetchTraceback())
        sys.exit(11)

    if values.verbose:
        print 'Public CA SSL certificate:  %s' % values.ca_cert

    return values
开发者ID:T-D-Oe,项目名称:spacewalk,代码行数:34,代码来源:rhn_ssl_dbstore.py


示例18: main

def main():
    rhnSQL.initDB()

    channel = {"label": "mibanescu-test2"}

    orgid = 1198839
    package_template = {"name": "useless", "version": "1.0.0", "arch": "noarch", "org_id": orgid}

    batch = []
    p = importLib.IncompletePackage()
    p.populate(package_template)
    p["release"] = "2"
    p["channels"] = [channel]
    batch.append(p)

    p = importLib.IncompletePackage()
    p.populate(package_template)
    p["release"] = "3"
    p["channels"] = [channel]
    batch.append(p)

    backend = backendOracle.OracleBackend()
    cps = packageImport.ChannelPackageSubscription(batch, backend, caller="misa.testing", strict=1)
    cps.run()
    print cps.affected_channel_packages
开发者ID:kidaa30,项目名称:spacewalk,代码行数:25,代码来源:test_strict_channel_package_subscription.py


示例19: setup_config

    def setup_config(self, config, force=0):
        # Figure out the log level
        debug_level = self.options.verbose
        if debug_level is None:
            debug_level = CFG.debug
        self.debug_level = debug_level
        logfile = self.options.logfile
        if logfile is None or logfile == '':
            logfile = CFG.log_file
        initLOG(level=debug_level, log_file=logfile)

        # Get the ssl cert
        ssl_cert = CFG.osa_ssl_cert
        try:
            self.check_cert(ssl_cert)
        except jabber_lib.InvalidCertError:
            e = sys.exc_info()[1]
            log_error("Invalid SSL certificate:", e)
            return 1

        self.ssl_cert = ssl_cert

        rhnSQL.initDB()

        self._username = 'rhn-dispatcher-sat'
        self._password = self.get_dispatcher_password(self._username)
        if not self._password:
            self._password = self.create_dispatcher_password(32)
        self._resource = 'superclient'
        js = config.get('jabber_server')
        self._jabber_servers = [ idn_ascii_to_puny(js) ]
开发者ID:m47ik,项目名称:uyuni,代码行数:31,代码来源:osa_dispatcher.py


示例20: main

def main():
    if len(sys.argv) == 1:
        server_name = 'xmlrpc.rhn.webdev.redhat.com'
    else:
        server_name = sys.argv[1]

    if len(sys.argv) <= 2:
        db_name = 'rhnuser/[email protected]'
    else:
        db_name = sys.argv[2]

    rhnSQL.initDB(db_name)

    uri = "http://%s/XMLRPC" % (server_name, )
    s = rpclib.Server(uri)

    username = password = "test-username-%.3f" % time.time()
    email = "misa+%[email protected]" % username

    s.registration.reserve_user(username, password)
    s.registration.new_user(username, password, email)

    data = {
       'os_release'     : '9',
       'architecture'   : 'athlon-redhat-linux',
       'profile_name'   : 'Test profile for %s' % username,
       'username'       : username,
       'password'       : password,
    }
    systemid = s.registration.new_system(data)

    str_caps = [
        'this.is.bogus1(0)=0',
        'this.is.bogus2(1)=1',
        'this.is.bogus3(2)=2',
    ]
    for cap in str_caps:
        s.add_header('X-RHN-Client-Capability', cap)
    
    # Add some packages
    packages = [
        ['a', '1', '1', ''],
        ['b', '2', '2', ''],
        ['c', '3', '3', ''],
    ]
    s.registration.update_packages(systemid, packages)

    sobj = rhnServer.get(systemid)
    server_id = sobj.getid()
    print "Registered server", server_id

    caps = rhnCapability.get_db_client_capabilities(server_id)
    caps_len = len(caps.keys())
    expected_len = len(str_caps)
    if caps_len != expected_len:
        print "FAILED: return %d caps, expected %d" % (caps_len, expected_len)
        return 1
    print "Succeeded", caps
    return 0
开发者ID:bjmingyang,项目名称:spacewalk,代码行数:59,代码来源:test_capabilities.py



注:本文中的spacewalk.server.rhnSQL.initDB函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python rhnSQL.prepare函数代码示例发布时间:2022-05-27
下一篇:
Python rhnSQL.fetchone_dict函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap