• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python search.SearchType类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyasm.search.SearchType的典型用法代码示例。如果您正苦于以下问题:Python SearchType类的具体用法?Python SearchType怎么用?Python SearchType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SearchType类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(my, search_type, config_base, input_prefix='', config=None):

        if type(search_type) in types.StringTypes:
            my.search_type_obj = SearchType.get(search_type)
            my.search_type = search_type
        elif isinstance(search_type, SearchType):
            my.search_type_obj = search_type
            my.search_type = my.search_type_obj.get_base_key() 
        elif inspect.isclass(search_type) and issubclass(search_type, SObject):
            my.search_type_obj = SearchType.get(search_type.SEARCH_TYPE)
            my.search_type = my.search_type_obj.get_base_key()
        else:
            raise LayoutException('search_type must be a string or an sobject')
        my.config = config
        my.config_base = config_base
        my.input_prefix = input_prefix
        my.element_names = []
        my.element_titles = []

        from pyasm.web import DivWdg
        my.top = DivWdg()

        # Layout widgets compartmentalize their widgets in sections for drawing
        my.sections = {}

        super(BaseConfigWdg,my).__init__() 
开发者ID:0-T-0,项目名称:TACTIC,代码行数:26,代码来源:base_config_wdg.py


示例2: create

    def create(self):

        project = Project.get_by_code(self.project_code)
        if project:

            self.delete()

        print "Setting up a basic Sample3d project"

        # create the project
        create_cmd = CreateProjectCmd(project_code=self.project_code, project_title="Sample 3D") #, project_type="unittest")
        create_cmd.execute()

        # install the unittest plugin
        installer = PluginInstaller(relative_dir="TACTIC/internal/sample3d", verbose=False)
        installer.execute()

        # add 30 shots
        for x in xrange(30):
            shot = SearchType.create("prod/shot")
            shot.set_value('name','shot%s'%x)
            shot.set_value('sequence_code','SEQ_01')
            shot.commit(triggers=False)

        if not Search.eval("@SOBJECT(prod/sequence['code','SEQ_01'])"):
            seq = SearchType.create("prod/sequence")
            seq.set_value('code','SEQ_01')
            seq.commit(triggers=False)
开发者ID:mincau,项目名称:TACTIC,代码行数:28,代码来源:environment.py


示例3: copy_sobject

    def copy_sobject(my, sobject, dst_search_type, context=None, checkin_mode='inplace'):

        new_sobject = SearchType.create(dst_search_type)
        search_type = SearchType.get(dst_search_type)
        columns = SearchType.get_columns(dst_search_type)

        data = sobject.get_data()
        for name, value in data.items():
            if name in ['id','pipeline_code']:
                continue

            if name not in columns:
                continue

            if not value:
                continue

            if name == "code":
                value = Common.get_next_sobject_code(sobject, 'code')
                if not value:
                    continue
            new_sobject.set_value(name, value)
        if SearchType.column_exists(dst_search_type, "project_code"):
            project_code = Project.get_project_code()
            new_sobject.set_value("project_code", project_code)
        new_sobject.commit()



        # get all of the current snapshots and file paths associated
        if not context:
            snapshots = Snapshot.get_all_current_by_sobject(sobject)
        else:
            snapshots = [Snapshot.get_current_by_sobject(sobject, context)]

        if not snapshots:
            return

        msgs = []
        for snapshot in snapshots:
            #file_paths = snapshot.get_all_lib_paths()
            file_paths_dict = snapshot.get_all_paths_dict()
            file_types = file_paths_dict.keys()
            if not file_types:
                continue

            # make sure the paths match the file_types
            file_paths = [file_paths_dict.get(x)[0] for x in file_types]

            mode = checkin_mode

            # checkin the files (inplace)
            try:
                context = snapshot.get_value('context')
                checkin = FileCheckin(new_sobject, context=context, file_paths=file_paths, file_types=file_types, mode=mode)
                checkin.execute()

                #print "done: ", context, new_sobject.get_related_sobjects("sthpw/snapshot")
            except CheckinException, e:
                msgs.append('Post-process Check-in Error for %s: %s ' %(context, e.__str__()))
开发者ID:0-T-0,项目名称:TACTIC,代码行数:60,代码来源:sobject_copy_cmd.py


示例4: _test_base_dir_alias

    def _test_base_dir_alias(my):

        Config.set_value("checkin", "asset_base_dir", {
            'default': '/tmp/tactic/default',
            'alias': '/tmp/tactic/alias',
            'alias2': '/tmp/tactic/alias2',
        });
        asset_dict = Environment.get_asset_dirs()
        default_dir = asset_dict.get("default")
        my.assertEquals( "/tmp/tactic/default", default_dir)

        aliases = asset_dict.keys()
        # "plugins" is assumed in some branch 
        if 'plugins' in aliases:
            my.assertEquals( 4, len(aliases))
        else:
            my.assertEquals( 3, len(aliases))
        my.assertNotEquals( None, "alias" in aliases )

        # create a naming
        naming = SearchType.create("config/naming")
        naming.set_value("search_type", "unittest/person")
        naming.set_value("context", "alias")
        naming.set_value("dir_naming", "alias")
        naming.set_value("file_naming", "text.txt")
        naming.set_value("base_dir_alias", "alias")
        naming.commit()

        # create 2nd naming where 
        naming = SearchType.create("config/naming")
        naming.set_value("search_type", "unittest/person")
        naming.set_value("context", "alias2")
        naming.set_value("dir_naming", "alias2")
        naming.set_value("base_dir_alias", "alias2")
        naming.set_value("file_naming", "text.txt")
        naming.set_value("checkin_type", "auto")
        naming.commit()

        my.clear_naming()

        # create a new test.txt file
        for context in ['alias', 'alias2']:
            file_path = "./test.txt"
            file = open(file_path, 'w')
            file.write("whatever")
            file.close()

            checkin = FileCheckin(my.person, file_path, context=context)
            checkin.execute()
            snapshot = checkin.get_snapshot()

            lib_dir = snapshot.get_lib_dir()
            expected = "/tmp/tactic/%s/%s" % (context, context)
            my.assertEquals(expected, lib_dir)

            path = "%s/text.txt" % (lib_dir)
            exists = os.path.exists(path)
            my.assertEquals(True, exists)
开发者ID:hellios78,项目名称:TACTIC,代码行数:58,代码来源:checkin_test.py


示例5: get_message

    def get_message(my):
        search_type_obj = my.sobject.get_search_type_obj()
        title = search_type_obj.get_title()
        subject = my.get_subject()
        notification_message = my.notification.get_value("message")
        if notification_message:
            # parse it through the expression
            sudo = Sudo()
            parser = ExpressionParser()
            snapshot = my.input.get('snapshot')
            env_sobjects = {}

            # turn prev_data and update_data from input into sobjects
            prev_data = SearchType.create("sthpw/virtual")
            id_col = prev_data.get_id_col()

            if id_col:
                del prev_data.data[id_col]

            prev_dict = my.input.get("prev_data")
            if prev_dict:
                for name, value in prev_dict.items():
                    if value != None:
                        prev_data.set_value(name, value)


            update_data = SearchType.create("sthpw/virtual")
            id_col = update_data.get_id_col()

            if id_col:
                del update_data.data[id_col]

            update_dict = my.input.get("update_data")
            if update_dict:
                for name, value in update_dict.items():
                    if value != None:
                        update_data.set_value(name, value)



            if snapshot:

                env_sobjects = {
                'snapshot': snapshot
            }


            env_sobjects['prev_data'] = prev_data
            env_sobjects['update_data'] = update_data

            notification_message  = parser.eval(notification_message, my.sobject, env_sobjects=env_sobjects, mode='string')
            del sudo
            return notification_message

        message = "%s %s" % (title, my.sobject.get_name())
        message = '%s\n\nReport from transaction:\n%s\n' % (message, subject)
        return message
开发者ID:0-T-0,项目名称:TACTIC,代码行数:57,代码来源:email_handler.py


示例6: execute

    def execute(my):

        collection_key = my.kwargs.get("collection_key")
        search_keys = my.kwargs.get("search_keys")

        collection = Search.get_by_search_key(collection_key)
        if not collection:
            raise Exception("Collection does not exist")


        search_type = collection.get_base_search_type()
        parts = search_type.split("/")
        collection_type = "%s/%s_in_%s" % (parts[0], parts[1], parts[1])
        search = Search(collection_type)
        search.add_filter("parent_code", collection.get_code())
        items = search.get_sobjects()


        search_codes = [x.get_value("search_code") for x in items]
        search_codes = set(search_codes)



        has_keywords = SearchType.column_exists(search_type, "keywords")

        if has_keywords:
            collection_keywords = collection.get_value("keywords", no_exception=True)
            collection_keywords = collection_keywords.split(" ")
            collection_keywords = set(collection_keywords)



        # create new items

        sobjects = Search.get_by_search_keys(search_keys)
        for sobject in sobjects:
            if sobject.get_code() in search_codes:
                continue

            new_item = SearchType.create(collection_type)
            new_item.set_value("parent_code", collection.get_code())
            new_item.set_value("search_code", sobject.get_code())
            new_item.commit()


            # copy the metadata of the collection
            if has_keywords:
                keywords = sobject.get_value("keywords")

                keywords = keywords.split(" ")
                keywords = set(keywords)

                keywords = keywords.union(collection_keywords)
                keywords = " ".join(keywords)

                sobject.set_value("keywords", keywords)
                sobject.commit()
开发者ID:asmboom,项目名称:TACTIC,代码行数:57,代码来源:collection_wdg.py


示例7: execute

    def execute(my):

        search_type = my.kwargs.get("search_type")
        column_info = SearchType.get_column_info(search_type)

        values = my.kwargs.get("values")

        # get the definition config for this search_type
        from pyasm.search import WidgetDbConfig
        config = WidgetDbConfig.get_by_search_type(search_type, "definition")
        if not config:
            config = SearchType.create("config/widget_config")
            config.set_value("search_type", search_type)
            config.set_value("view", "definition")
            config.commit()
            config._init()

        for data in values:

            name = data.get("name")
            name = name.strip()
            if name == '':
                continue

            try:
                name.encode('ascii')
            except UnicodeEncodeError:
                raise TacticException('Column name needs to be in English. Non-English characters can be used in Title when performing [Edit Column Definition] afterwards.')


            if column_info.get(name):
                raise CommandException("Column [%s] is already defined" % name)

            format = data.get("format")
            fps = data.get("fps")
            data_type = data.get("data_type")

            from pyasm.command import ColumnAddCmd
            cmd = ColumnAddCmd(search_type, name, data_type)
            cmd.execute()
            #(my, search_type, attr_name, attr_type, nullable=True):


            class_name = 'tactic.ui.table.FormatElementWdg'
            options = {
                'format': format,
                'type': data_type,
                'fps': fps
            }


            # add a new widget to the definition
            config.append_display_element(name, class_name, options=options)

        config.commit_config()
开发者ID:0-T-0,项目名称:TACTIC,代码行数:55,代码来源:column_edit_wdg.py


示例8: set_templates

    def set_templates(self):

        project_code = WebContainer.get_web().get_full_context_name()
        if project_code == "default":
            project_code = Project.get_default_project()


        try:
            SearchType.set_global_template("project", project_code)
        except SecurityException as e:
            print("WARNING: ", e)
开发者ID:mincau,项目名称:TACTIC,代码行数:11,代码来源:top_wdg.py


示例9: execute

    def execute(my):

        import types

        transaction_xml = my.kwargs.get("transaction_xml")
        file_mode = my.kwargs.get("file_mode")
        if not file_mode:
            file_mode = 'delayed'

        # if the first argument is a dictionary, then the whole
        # transaction sobject was passed through
        # NOTE: this is now the default
        if type(transaction_xml) == types.DictType:
            transaction_dict = transaction_xml
            transaction_xml = transaction_dict.get("transaction")
            timestamp = transaction_dict.get("timestamp")
            login = transaction_dict.get("login")

            # recreate the transaction
            transaction = SearchType.create("sthpw/transaction_log")
            for name, value in transaction_dict.items():
                if name.startswith("__"):
                    continue
                if name == 'id':
                    continue
                if value == None:
                    continue
                transaction.set_value(name, value)

        elif isinstance(transaction_xml, SObject):
            transaction = transaction_xml

        else:
            # Create a fake transaction.
            # This is only used for test purposes
            transaction = SearchType.create("sthpw/transaction_log")
            if transaction_xml:
                transaction.set_value("transaction", transaction_xml)
            else:
                print "WARNING: transaction xml is empty"
            transaction.set_value("login", "admin")

            # commit the new transaction.  This is the only case where
            # a transaction will not have a code, so it has to be committed.
            # The other case do not need to be committed because they will
            # already have codes and the transaction is committed in 
            # RedoCmd
            # 
            try:
                transaction.commit()
            except Exception, e:
                print "Failed to commit transaction [%s]: It may already exist. Skipping." % transaction.get_code()
                print str(e)
                return
开发者ID:0-T-0,项目名称:TACTIC,代码行数:54,代码来源:run_transaction_cmd.py


示例10: set_templates

    def set_templates(my):
        if my.context:
            context = my.context
        else:
            context = WebContainer.get_web().get_full_context_name()

        try:

            SearchType.set_global_template("project", context)

        except SecurityException, e:
            print "WARNING: ", e
开发者ID:blezek,项目名称:TACTIC,代码行数:12,代码来源:top_wdg.py


示例11: check

    def check(my):
        my.search_type = my.kwargs.get("search_type")
        my.values = my.kwargs.get("values")

        my.db_resource = SearchType.get_db_resource_by_search_type(my.search_type)
        my.database = my.db_resource.get_database()
        my.search_type_obj = SearchType.get(my.search_type)
        if my.database != Project.get_project_code() and my.database !='sthpw':
            raise TacticException('You are not allowed to delete the sType [%s] from another project [%s].' %(my.search_type, my.database))
            return False
        
        return True
开发者ID:blezek,项目名称:TACTIC,代码行数:12,代码来源:delete_wdg.py


示例12: check

    def check(self):
        self.search_type = self.kwargs.get("search_type")
        self.values = self.kwargs.get("values")

        self.db_resource = SearchType.get_db_resource_by_search_type(self.search_type)
        self.database = self.db_resource.get_database()
        self.search_type_obj = SearchType.get(self.search_type)
        if self.database != Project.get_project_code() and self.database !='sthpw':
            raise TacticException('You are not allowed to delete the sType [%s] from another project [%s].' %(self.search_type, self.database))
            return False
        
        return True
开发者ID:mincau,项目名称:TACTIC,代码行数:12,代码来源:delete_wdg.py


示例13: do_search

    def do_search(my):
        '''this widget has its own search mechanism'''

        web = WebContainer.get_web()
        
        # get the sobject that is to be edited
        id = my.search_id

        # if no id is given, then create a new one for insert
        search = None
        sobject = None
        search_type_base = SearchType.get(my.search_type).get_base_key()
        if my.mode == "insert":
            sobject = SearchType.create(my.search_type)
            my.current_id = -1
            # prefilling default values if available
            value_keys = web.get_form_keys()
            if value_keys:
                
                for key in value_keys:
                    value = web.get_form_value(key)
                    sobject.set_value(key, value)
        else:
            search = Search(my.search_type)

            # figure out which id to search for
            if web.get_form_value("do_edit") == "Edit/Next":
                search_ids = web.get_form_value("%s_search_ids" %search_type_base)
                if search_ids == "":
                    my.current_id = id
                else:
                    search_ids = search_ids.split("|")
                    next = search_ids.index(str(id)) + 1
                    if next == len(search_ids):
                        next = 0
                    my.current_id = search_ids[next]

                    last_search = Search(my.search_type)
                    last_search.add_id_filter( id )
                    my.last_sobject = last_search.get_sobject()

            else:
                my.current_id = id

            search.add_id_filter( my.current_id )
            sobject = search.get_sobject()

        if not sobject and my.current_id != -1:
            raise EditException("No SObject found")

        # set all of the widgets to contain this sobject
        my.set_sobjects( [sobject], search )
开发者ID:funic,项目名称:TACTIC,代码行数:52,代码来源:edit_wdg.py


示例14: get_pipeline

    def get_pipeline(my, pipeline_xml, add_tasks=False):

        pipeline = SearchType.create("sthpw/pipeline")
        pipeline.set_pipeline(pipeline_xml)
        pipeline_id = random.randint(0, 10000000)
        #pipeline.set_value("code", "test%s" % pipeline_id)
        #pipeline.set_id(pipeline_id)
        #pipeline.set_value("id", pipeline_id)
        pipeline.set_value("pipeline", pipeline_xml)
        pipeline.commit()

        process_names = pipeline.get_process_names()

        # delete the processes
        search = Search("config/process")
        search.add_filters("process", process_names)
        processes = search.get_sobjects()
        for process in processes:
            process.delete()

        # create new processes
        processes_dict = {}
        for process_name in process_names:

            # define the process nodes
            process = SearchType.create("config/process")
            process.set_value("process", process_name)
            process.set_value("pipeline_code", pipeline.get_code())
            process.set_json_value("workflow", {
                'on_complete': '''
                sobject.set_value('%s', "complete")
                ''' % process_name,
                'on_approve': '''
                sobject.set_value('%s', "approve")
                ''' % process_name,
 
            } )
            process.commit()

            processes_dict[process_name] = process


            # Note: we don't have an sobject yet
            if add_tasks:
                task = SaerchType.create("sthpw/task")
                task.set_parent(sobject)
                task.set_value("process", process_name)
                task.commit()


        return pipeline, processes_dict
开发者ID:nuxping,项目名称:TACTIC,代码行数:51,代码来源:workflow_test.py


示例15: get_logins_by_id

 def get_logins_by_id(note_id):
     login_in_group = SearchType.get(LoginInGroup.SEARCH_TYPE)
     group_note = SearchType.get(GroupNotification.SEARCH_TYPE)
     search = Search(Login.SEARCH_TYPE)
     query_str = ''
     
     if isinstance(note_id, list):
         query_str = "in (%s)" %",".join([str(id) for id in note_id])
     else:
         query_str = "= %d" %note_id
     search.add_where('''"login" in (select "login" from "%s" where "login_group" in (select "login_group" from "%s" where "notification_id" %s)) ''' % (login_in_group.get_table(), group_note.get_table(), query_str))
         
         
     return search.get_sobjects()
开发者ID:mincau,项目名称:TACTIC,代码行数:14,代码来源:notification.py


示例16: _test_sobject_hierarchy

    def _test_sobject_hierarchy(my):

        # FIXME: this functionality has been disabled until further notice
        return

        snapshot_type = SearchType.create("sthpw/snapshot_type")
        snapshot_type.set_value("code", "maya_model")
        snapshot_type.commit()

        snapshot_type = SearchType.create("prod/snapshot_type")
        snapshot_type.set_value("code", "maya_model")
        snapshot_type.commit()

        snapshot_type = SnapshotType.get_by_code("maya_model")
开发者ID:funic,项目名称:TACTIC,代码行数:14,代码来源:biz_test.py


示例17: get_display

    def get_display(my):

        widget = Widget()

        div = DivWdg(css="filter_box")

        show_span = SpanWdg(css="med")
        show_span.add("Show All Types: ")
        checkbox = FilterCheckboxWdg("show_all_types")
        checkbox.set_persistence()
        show_span.add(checkbox)
        show_all_types = checkbox.get_value()
        div.add(show_span)

        span = SpanWdg(css="med")
        span.add("Search Type: ")

        select = SelectWdg("filter|search_type")
        select.add_empty_option("-- Select --")
        project = Project.get()
        project_type = project.get_base_type()
        search = Search("sthpw/search_object")
        if show_all_types:
            search.add_where(
                """
            namespace = '%s' or namespace = '%s' or search_type in ('sthpw/task')
            """
                % (project_type, project.get_code())
            )
        else:
            # show only the custom ones
            search.add_filter("namespace", project.get_code())

        search.add_order_by("title")
        sobjects = search.get_sobjects()

        select.set_sobjects_for_options(sobjects, "search_type", "title")
        # select.set_option("query", "sthpw/search_object|search_type|title")
        select.set_persistence()
        select.add_event("onchange", "document.form.submit()")
        search_type = select.get_value()
        span.add(select)
        div.add(span)

        # make sure the current selection exists
        try:
            SearchType.get(search_type)
        except SearchException, e:
            return div
开发者ID:hellios78,项目名称:TACTIC,代码行数:49,代码来源:custom_view_app_wdg.py


示例18: _test_time

    def _test_time(my):
        ''' test timezone related behavior'''
        sobject = SearchType.create('sthpw/task')
        sobject.set_value('project_code','unittest')
        sobject.set_value('bid_start_date', '2014-11-11 05:00:00')
        time = sobject.get_value('bid_start_date')
        my.assertEquals(time, '2014-11-11 05:00:00')

        sobject.commit()

        time = sobject.get_value('bid_start_date')
        my.assertEquals(time, '2014-11-11 05:00:00')
        from pyasm.search import DbContainer
        sql = DbContainer.get('sthpw')
        db_value = sql.do_query('SELECT bid_start_date from task where id = %s'%sobject.get_id())
        
        # 2014-11-11 00:00:00 is actually written to the database
        my.assertEquals(db_value[0][0].strftime('%Y-%m-%d %H:%M:%S %Z'), '2014-11-11 00:00:00 ')
        
        # an sType specified without a project but with an id could be a common human error
        # but it should handle that fine
        obj1 = Search.eval('@SOBJECT(unittest/person?project=unittest["id", "%s"])'%sobject.get_id(), single=True)
        obj2= Search.eval('@SOBJECT(unittest/person?id=2["id", "%s"])'%sobject.get_id(), single=True)
        obj3 = Search.eval('@SOBJECT(sthpw/task?id=2["id", "%s"])'%sobject.get_id(), single=True)
        task = Search.eval('@SOBJECT(sthpw/task["id", "%s"])'%sobject.get_id(), single=True)

        # EST and GMT diff is 5 hours
        my.assertEquals(task.get_value('bid_start_date'), '2014-11-11 05:00:00')


        # test NOW() auto conversion
        sobj = SearchType.create('sthpw/note')
        sobj.set_value('process','TEST')
        sobj.set_value('note','123')
        my.assertEquals(sobj.get_value('timestamp'), "")
        sobj.commit()

        # this is local commited time converted back to GMT
        committed_time = sobj.get_value('timestamp')
        
        from dateutil import parser
        committed_time = parser.parse(committed_time)

        from pyasm.common import SPTDate
        now = SPTDate.now()
        diff = now - committed_time
        # should be roughly the same minute, not hours apart
        my.assertEquals(diff.seconds < 60, True)
开发者ID:0-T-0,项目名称:TACTIC,代码行数:48,代码来源:biz_test.py


示例19: _test_trigger

    def _test_trigger(my):

        # create a dummy sobject
        sobject = SearchType.create("unittest/person")

        pipeline_xml = '''
        <pipeline>
          <process type="action" name="a"/>
        </pipeline>
        '''
        pipeline, processes = my.get_pipeline(pipeline_xml)

        process = processes.get("a")
        process.set_value("workflow", "")
        process.commit()


        folder = Common.generate_alphanum_key()

        Trigger.clear_db_cache()
        event = "process|action"
        trigger = SearchType.create("config/trigger")
        trigger.set_value("event", event)
        trigger.set_value("process", process.get_code())
        trigger.set_value("mode", "same process,same transaction")
        trigger.set_value("script_path", "%s/process_trigger" % folder)
        trigger.commit()

        script = SearchType.create("config/custom_script")
        script.set_value("folder", folder)
        script.set_value("title", "process_trigger")
        script.set_value("script", '''
        print "---"
        for key, value in input.items():
            print key, value
        print "---"
        print "process: ", input.get("process")
        ''')
        script.commit()
 
        # Run the pipeline
        process = "a"
        output = {
            "pipeline": pipeline,
            "sobject": sobject,
            "process": process
        }
        Trigger.call(my, "process|pending", output)
开发者ID:jayvdb,项目名称:TACTIC,代码行数:48,代码来源:workflow_test.py


示例20: execute

    def execute(my):
        assert my.db_resource
        assert my.table

        database = my.db_resource.get_database()

        from pyasm.search import Insert, Select, DbContainer, Search, Sql

        # get the data
        if not my.sobjects:
            search = Search("sthpw/search_object")

            # BAD assumption
            #search.add_filter("table", my.table)
            # create a search_type. This is bad assumption cuz it assumes project-specific search_type
            # should call set_search_type()
            if not my.search_type:
                my.search_type = "%s/%s" % (my.database, my.table)
            search.add_filter("search_type", my.search_type)

            my.search_type_obj = search.get_sobject()
            if not my.search_type_obj:
                if my.no_exception == False:
                    raise SqlException("Table [%s] does not have a corresponding search_type" % my.table)
                else:
                    return

            search_type = my.search_type_obj.get_base_key()
            search = Search(my.search_type)
            search.set_show_retired(True)
            my.sobjects = search.get_sobjects()
            
        # get the info for the table
        column_info = SearchType.get_column_info(my.search_type)

        for sobject in my.sobjects:
            print my.delimiter

            insert = Insert()
            insert.set_database(my.database)
            insert.set_table(my.table)

            data = sobject.data
            for name, value in data.items():

                if name in my.ignore_columns:
                    continue

                if not my.include_id and name == "id":
                    insert.set_value("id", '"%s_id_seq".nextval' % table, quoted=False)
                    #insert.set_value(name, value, quoted=False)
                elif value == None:
                    continue
                else:
                    # replace all of the \ with double \\
                    insert.set_value(name, value)

            print "%s" % insert.get_statement()
            print my.end_delimiter
            print
开发者ID:CeltonMcGrath,项目名称:TACTIC,代码行数:60,代码来源:sql_dumper.py



注:本文中的pyasm.search.SearchType类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python search.Transaction类代码示例发布时间:2022-05-25
下一篇:
Python search.SearchKey类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap