• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python strings.expand_template函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyLibrary.strings.expand_template函数的典型用法代码示例。如果您正苦于以下问题:Python expand_template函数的具体用法?Python expand_template怎么用?Python expand_template使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了expand_template函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: assertAlmostEqualValue

def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, delta=None):
    """
    Snagged from unittest/case.py, then modified (Aug2014)
    """
    if expected == None:  # None has no expectations
        return
    if test == expected:
        # shortcut
        return

    if not Math.is_number(expected):
        # SOME SPECIAL CASES, EXPECTING EMPTY CONTAINERS IS THE SAME AS EXPECTING NULL
        if isinstance(expected, list) and len(expected)==0 and test == None:
            return
        if isinstance(expected, Mapping) and not expected.keys() and test == None:
            return
        if test != expected:
            raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))
        return

    num_param = 0
    if digits != None:
        num_param += 1
    if places != None:
        num_param += 1
    if delta != None:
        num_param += 1
    if num_param>1:
        raise TypeError("specify only one of digits, places or delta")

    if digits is not None:
        with suppress_exception:
            diff = Math.log10(abs(test-expected))
            if diff < digits:
                return

        standardMsg = expand_template("{{test}} != {{expected}} within {{digits}} decimal places", locals())
    elif delta is not None:
        if abs(test - expected) <= delta:
            return

        standardMsg = expand_template("{{test}} != {{expected}} within {{delta}} delta", locals())
    else:
        if places is None:
            places = 15

        with suppress_exception:
            diff = Math.log10(abs(test-expected))
            if diff < Math.ceiling(Math.log10(abs(test)))-places:
                return


        standardMsg = expand_template("{{test|json}} != {{expected|json}} within {{places}} places", locals())

    raise AssertionError(coalesce(msg, "") + ": (" + standardMsg + ")")
开发者ID:klahnakoski,项目名称:TestFailures,代码行数:55,代码来源:fuzzytestcase.py


示例2: _aggop

    def _aggop(self, query):
        """
        SINGLE ROW RETURNED WITH AGGREGATES
        """
        if isinstance(query.select, list):
            # RETURN SINGLE OBJECT WITH AGGREGATES
            for s in query.select:
                if s.aggregate not in aggregates:
                    Log.error("Expecting all columns to have an aggregate: {{select}}", select=s)

            selects = DictList()
            for s in query.select:
                selects.append(aggregates[s.aggregate].replace("{{code}}", s.value) + " AS " + self.db.quote_column(s.name))

            sql = expand_template("""
                SELECT
                    {{selects}}
                FROM
                    {{table}}
                {{where}}
            """, {
                "selects": SQL(",\n".join(selects)),
                "table": self._subquery(query["from"])[0],
                "where": self._where2sql(query.filter)
            })

            return sql, lambda sql: self.db.column(sql)[0]  # RETURNING SINGLE OBJECT WITH AGGREGATE VALUES
        else:
            # RETURN SINGLE VALUE
            s0 = query.select
            if s0.aggregate not in aggregates:
                Log.error("Expecting all columns to have an aggregate: {{select}}", select=s0)

            select = aggregates[s0.aggregate].replace("{{code}}", s0.value) + " AS " + self.db.quote_column(s0.name)

            sql = expand_template("""
                SELECT
                    {{selects}}
                FROM
                    {{table}}
                {{where}}
            """, {
                "selects": SQL(select),
                "table": self._subquery(query["from"])[0],
                "where": self._where2sql(query.where)
            })

            def post(sql):
                result = self.db.column_query(sql)
                return result[0][0]

            return sql, post  # RETURN SINGLE VALUE
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:52,代码来源:jx_usingMySQL.py


示例3: column_query

    def column_query(self, sql, param=None):
        """
        RETURN RESULTS IN [column][row_num] GRID
        """
        self._execute_backlog()
        try:
            old_cursor = self.cursor
            if not old_cursor: # ALLOW NON-TRANSACTIONAL READS
                self.cursor = self.db.cursor()
                self.cursor.execute("SET TIME_ZONE='+00:00'")
                self.cursor.close()
                self.cursor = self.db.cursor()

            if param:
                sql = expand_template(sql, self.quote_param(param))
            sql = self.preamble + outdent(sql)
            if self.debug:
                Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))

            self.cursor.execute(sql)
            grid = [[utf8_to_unicode(c) for c in row] for row in self.cursor]
            # columns = [utf8_to_unicode(d[0]) for d in coalesce(self.cursor.description, [])]
            result = zip(*grid)

            if not old_cursor:   # CLEANUP AFTER NON-TRANSACTIONAL READS
                self.cursor.close()
                self.cursor = None

            return result
        except Exception, e:
            if isinstance(e, InterfaceError) or e.message.find("InterfaceError") >= 0:
                Log.error("Did you close the db connection?", e)
            Log.error("Problem executing SQL:\n{{sql|indent}}",  sql= sql, cause=e,stack_depth=1)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:33,代码来源:mysql.py


示例4: quote_value

 def quote_value(self, value):
     """
     convert values to mysql code for the same
     mostly delegate directly to the mysql lib, but some exceptions exist
     """
     try:
         if value == None:
             return "NULL"
         elif isinstance(value, SQL):
             if not value.param:
                 # value.template CAN BE MORE THAN A TEMPLATE STRING
                 return self.quote_sql(value.template)
             param = {k: self.quote_sql(v) for k, v in value.param.items()}
             return expand_template(value.template, param)
         elif isinstance(value, basestring):
             return self.db.literal(value)
         elif isinstance(value, datetime):
             return "str_to_date('" + value.strftime("%Y%m%d%H%M%S") + "', '%Y%m%d%H%i%s')"
         elif hasattr(value, '__iter__'):
             return self.db.literal(json_encode(value))
         elif isinstance(value, Mapping):
             return self.db.literal(json_encode(value))
         elif Math.is_number(value):
             return unicode(value)
         else:
             return self.db.literal(value)
     except Exception, e:
         Log.error("problem quoting SQL", e)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:28,代码来源:mysql.py


示例5: execute

    def execute(
        self,
        command,
        param=None,
        retry=True     # IF command FAILS, JUST THROW ERROR
    ):
        if param:
            command = expand_template(command, self.quote_param(param))

        output = None
        done = False
        while not done:
            try:
                with self.locker:
                    if not self.connection:
                        self._connect()

                with Closer(self.connection.cursor()) as curs:
                    curs.execute(command)
                    if curs.rowcount >= 0:
                        output = curs.fetchall()
                self.connection.commit()
                done = True
            except Exception, e:
                try:
                    self.connection.rollback()
                    # TODO: FIGURE OUT WHY rollback() DOES NOT HELP
                    self.connection.close()
                except Exception, f:
                    pass
                self.connection = None
                self._connect()
                if not retry:
                    Log.error("Problem with command:\n{{command|indent}}",  command= command, cause=e)
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:34,代码来源:redshift.py


示例6: write

    def write(self, template, params):
        with self.locker:
            if params.params.warning.template or params.params.warning.template:
                self.accumulation.append(expand_template(template, params))

                if Date.now() > self.last_sent + WAIT_TO_SEND_MORE:
                    self._send_email()
开发者ID:klahnakoski,项目名称:Activedata-ETL,代码行数:7,代码来源:log_usingEmail.py


示例7: json2value

def json2value(json_string, params={}, flexible=False, leaves=False):
    """
    :param json_string: THE JSON
    :param params: STANDARD JSON PARAMS
    :param flexible: REMOVE COMMENTS
    :param leaves: ASSUME JSON KEYS ARE DOT-DELIMITED
    :return: Python value
    """
    if isinstance(json_string, str):
        Log.error("only unicode json accepted")

    try:
        if flexible:
            # REMOVE """COMMENTS""", # COMMENTS, //COMMENTS, AND \n \r
            # DERIVED FROM https://github.com/jeads/datasource/blob/master/datasource/bases/BaseHub.py# L58
            json_string = re.sub(r"\"\"\".*?\"\"\"", r"\n", json_string, flags=re.MULTILINE)
            json_string = "\n".join(remove_line_comment(l) for l in json_string.split("\n"))
            # ALLOW DICTIONARY'S NAME:VALUE LIST TO END WITH COMMA
            json_string = re.sub(r",\s*\}", r"}", json_string)
            # ALLOW LISTS TO END WITH COMMA
            json_string = re.sub(r",\s*\]", r"]", json_string)

        if params:
            # LOOKUP REFERENCES
            json_string = expand_template(json_string, params)

        try:
            value = wrap(json_decoder(unicode(json_string)))
        except Exception, e:
            Log.error("can not decode\n{{content}}", content=json_string, cause=e)

        if leaves:
            value = wrap_leaves(value)

        return value
开发者ID:klahnakoski,项目名称:TestFailures,代码行数:35,代码来源:convert.py


示例8: forall

    def forall(self, sql, param=None, _execute=None):
        assert _execute
        num = 0

        self._execute_backlog()
        try:
            old_cursor = self.cursor
            if not old_cursor: # ALLOW NON-TRANSACTIONAL READS
                self.cursor = self.db.cursor()

            if param:
                sql = expand_template(sql, self.quote_param(param))
            sql = self.preamble + outdent(sql)
            if self.debug:
                Log.note("Execute SQL:\n{{sql}}",  sql= indent(sql))
            self.cursor.execute(sql)

            columns = tuple([utf8_to_unicode(d[0]) for d in self.cursor.description])
            for r in self.cursor:
                num += 1
                _execute(wrap(dict(zip(columns, [utf8_to_unicode(c) for c in r]))))

            if not old_cursor:   # CLEANUP AFTER NON-TRANSACTIONAL READS
                self.cursor.close()
                self.cursor = None

        except Exception, e:
            Log.error("Problem executing SQL:\n{{sql|indent}}",  sql= sql, cause=e, stack_depth=1)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:28,代码来源:mysql.py


示例9: json2value

def json2value(json_string, params={}, flexible=False, leaves=False):
    """
    :param json_string: THE JSON
    :param params: STANDARD JSON PARAMS
    :param flexible: REMOVE COMMENTS
    :param leaves: ASSUME JSON KEYS ARE DOT-DELIMITED
    :return: Python value
    """
    if isinstance(json_string, str):
        Log.error("only unicode json accepted")

    try:
        if flexible:
            # REMOVE """COMMENTS""", # COMMENTS, //COMMENTS, AND \n \r
            # DERIVED FROM https://github.com/jeads/datasource/blob/master/datasource/bases/BaseHub.py# L58
            json_string = re.sub(r"\"\"\".*?\"\"\"", r"\n", json_string, flags=re.MULTILINE)
            json_string = "\n".join(remove_line_comment(l) for l in json_string.split("\n"))
            # ALLOW DICTIONARY'S NAME:VALUE LIST TO END WITH COMMA
            json_string = re.sub(r",\s*\}", r"}", json_string)
            # ALLOW LISTS TO END WITH COMMA
            json_string = re.sub(r",\s*\]", r"]", json_string)

        if params:
            json_string = expand_template(json_string, params)


        # LOOKUP REFERENCES
        value = wrap(json_decoder(json_string))

        if leaves:
            value = wrap_leaves(value)

        return value

    except Exception, e:
        e = Except.wrap(e)
        if "Expecting '" in e and "' delimiter: line" in e:
            line_index = int(strings.between(e.message, " line ", " column ")) - 1
            column = int(strings.between(e.message, " column ", " ")) - 1
            line = json_string.split("\n")[line_index].replace("\t", " ")
            if column > 20:
                sample = "..." + line[column - 20:]
                pointer = "   " + (" " * 20) + "^"
            else:
                sample = line
                pointer = (" " * column) + "^"

            if len(sample) > 43:
                sample = sample[:43] + "..."

            Log.error("Can not decode JSON at:\n\t" + sample + "\n\t" + pointer + "\n")

        base_str = unicode2utf8(strings.limit(json_string, 1000))
        hexx_str = bytes2hex(base_str, " ")
        try:
            char_str = " " + ("  ".join(c.decode("latin1") if ord(c) >= 32 else ".") for c in base_str)
        except Exception:
            char_str = " "
        Log.error("Can not decode JSON:\n" + char_str + "\n" + hexx_str + "\n", e)
开发者ID:mozilla,项目名称:ChangeDetector,代码行数:59,代码来源:convert.py


示例10: execute

    def execute(self, sql, param=None):
        if self.transaction_level == 0:
            Log.error("Expecting transaction to be started before issuing queries")

        if param:
            sql = expand_template(sql, self.quote_param(param))
        sql = outdent(sql)
        self.backlog.append(sql)
        if self.debug or len(self.backlog) >= MAX_BATCH_SIZE:
            self._execute_backlog()
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:10,代码来源:mysql.py


示例11: compileString2Term

def compileString2Term(edge):
    if edge.esscript:
        Log.error("edge script not supported yet")

    value = edge.value
    if isKeyword(value):
        value = strings.expand_template("getDocValue({{path}})", {"path": convert.string2quote(value)})
    else:
        Log.error("not handled")

    def fromTerm(value):
        return edge.domain.getPartByKey(value)

    return Dict(toTerm={"head": "", "body": value}, fromTerm=fromTerm)
开发者ID:klahnakoski,项目名称:MoDevETL,代码行数:14,代码来源:util.py


示例12: assertAlmostEqualValue

def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, delta=None):
    """
    Snagged from unittest/case.py, then modified (Aug2014)
    """
    if test == expected:
        # shortcut
        return

    if not Math.is_number(expected):
        # SOME SPECIAL CASES, EXPECTING EMPTY CONTAINERS IS THE SAME AS EXPECTING NULL
        if isinstance(expected, list) and len(expected) == 0 and test == None:
            return
        if isinstance(expected, Mapping) and not expected.keys() and test == None:
            return
        if test != expected:
            raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))
        return

    num_param = 0
    if digits != None:
        num_param += 1
    if places != None:
        num_param += 1
    if delta != None:
        num_param += 1
    if num_param > 1:
        raise TypeError("specify only one of digits, places or delta")

    if digits is not None:
        try:
            diff = Math.log10(abs(test - expected))
            if diff < digits:
                return
        except Exception, e:
            pass

        standardMsg = expand_template("{{test}} != {{expected}} within {{digits}} decimal places", locals())
开发者ID:mozilla,项目名称:ChangeDetector,代码行数:37,代码来源:fuzzytestcase.py


示例13: get_job_classification

    def get_job_classification(self, branch, revision):
        results = http.get_json(expand_template(RESULT_SET_URL, {"branch": branch, "revision": revision[0:12:]}))
        for r in results.results:
            jobs = http.get_json(expand_template(JOBS_URL, {"branch": branch, "result_set_id": r.id}))

            for j in jobs:
                notes = http.get_json(expand_template(NOTES_URL, {"branch": branch, "job_id": j.id}))
                for n in notes:
                    if not n.note:
                        continue

                    Log.note(
                        "{{note|json}}",
                        note={
                            "job_id": j.id,
                            "result_set_id": r.id,
                            "branch": branch,
                            "revision": r.revision,
                            "failure_classification_id": j.failure_classification_id,
                            "result": j.result,
                            "note_timestamp": n.timestamp,
                            "note": n.note
                        }
                    )
开发者ID:klahnakoski,项目名称:Activedata-ETL,代码行数:24,代码来源:treeherder.py


示例14: assertAlmostEqualValue

def assertAlmostEqualValue(first, second, digits=None, places=None, msg=None, delta=None):
    """
    Snagged from unittest/case.py, then modified (Aug2014)
    """
    if first == second:
        # shortcut
        return

    places = places if places is not None else digits
    if delta is not None and places is not None:
        raise TypeError("specify delta or places not both")

    if delta is not None:
        if abs(first - second) <= delta:
            return

        standardMsg = expand_template("{{first}} != {{second}} within {{delta}} delta", {
            "first": first,
            "second": second,
            "delta": delta
        })
    else:
        if places is None:
            places = 18

        diff = log10(abs(first-second))
        if diff < Math.ceiling(log10(abs(first)))-places:
            return

        standardMsg = expand_template("{{first}} != {{second}} within {{places}} places", {
            "first": first,
            "second": second,
            "": places
        })

    raise AssertionError(nvl(msg, "") + ": (" + standardMsg + ")")
开发者ID:klahnakoski,项目名称:Datazilla2ElasticSearch,代码行数:36,代码来源:fuzzytestcase.py


示例15: add

 def add(self, data):
     data = wrap(data)
     uid, count = self.uid.advance()
     link = expand_template(
         LINK_PATTERN,
         {
             "region": self.bucket.settings.region,
             "bucket": self.bucket.settings.bucket,
             "uid": uid
         }
     )
     data.etl.id = count
     data.etl.source.href = link
     data[UID_PATH] = uid
     self.temp_queue.add(data)
     return link, count
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:16,代码来源:storage.py


示例16: __str__

    def __str__(self):
        output = self.type + ": " + self.template + "\n"
        if self.params:
            output = expand_template(output, self.params)

        if self.trace:
            output += indent(format_trace(self.trace))

        if self.cause:
            cause_strings = []
            for c in listwrap(self.cause):
                try:
                    cause_strings.append(unicode(c))
                except Exception, e:
                    pass

            output += "caused by\n\t" + "and caused by\n\t".join(cause_strings)
开发者ID:klahnakoski,项目名称:intermittents,代码行数:17,代码来源:logs.py


示例17: quote_sql

 def quote_sql(self, value, param=None):
     """
     USED TO EXPAND THE PARAMETERS TO THE SQL() OBJECT
     """
     try:
         if isinstance(value, SQL):
             if not param:
                 return value
             param = {k: self.quote_sql(v) for k, v in param.items()}
             return expand_template(value, param)
         elif isinstance(value, basestring):
             return value
         elif isinstance(value, Mapping):
             return self.db.literal(json_encode(value))
         elif hasattr(value, '__iter__'):
             return "(" + ",".join([self.quote_sql(vv) for vv in value]) + ")"
         else:
             return unicode(value)
     except Exception, e:
         Log.error("problem quoting SQL", e)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:20,代码来源:mysql.py


示例18: execute_sql

    def execute_sql(
        host,
        username,
        password,
        sql,
        schema=None,
        param=None,
        settings=None
    ):
        """EXECUTE MANY LINES OF SQL (FROM SQLDUMP FILE, MAYBE?"""
        settings.schema = coalesce(settings.schema, settings.database)

        if param:
            with MySQL(settings) as temp:
                sql = expand_template(sql, temp.quote_param(param))

        # MWe have no way to execute an entire SQL file in bulk, so we
        # have to shell out to the commandline client.
        args = [
            "mysql",
            "-h{0}".format(settings.host),
            "-u{0}".format(settings.username),
            "-p{0}".format(settings.password)
        ]
        if settings.schema:
            args.append("{0}".format(settings.schema))

        try:
            proc = subprocess.Popen(
                args,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                bufsize=-1
            )
            if isinstance(sql, unicode):
                sql = sql.encode("utf8")
            (output, _) = proc.communicate(sql)
        except Exception, e:
            Log.error("Can not call \"mysql\"", e)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:40,代码来源:mysql.py


示例19: time_delta_pusher

def time_delta_pusher(please_stop, appender, queue, interval):
    """
    appender - THE FUNCTION THAT ACCEPTS A STRING
    queue - FILLED WITH LOG ENTRIES {"template":template, "params":params} TO WRITE
    interval - timedelta
    USE IN A THREAD TO BATCH LOGS BY TIME INTERVAL
    """

    if not isinstance(interval, timedelta):
        Log.error("Expecting interval to be a timedelta")

    next_run = datetime.utcnow() + interval

    while not please_stop:
        Thread.sleep(till=next_run)
        next_run = datetime.utcnow() + interval
        logs = queue.pop_all()
        if logs:
            lines = []
            for log in logs:
                try:
                    if log is Thread.STOP:
                        please_stop.go()
                        next_run = datetime.utcnow()
                    else:
                        expanded = expand_template(log.get("template"), log.get("params"))
                        lines.append(expanded)
                except Exception, e:
                    Log.warning("Trouble formatting logs", e)
                    # SWALLOW ERROR, GOT TO KEEP RUNNING
            try:
                if DEBUG_LOGGING and please_stop:
                    sys.stdout.write("Call to appender with " + str(len(lines)) + " lines\n")
                appender(u"\n".join(lines) + u"\n")
                if DEBUG_LOGGING and please_stop:
                    sys.stdout.write("Done call to appender with " + str(len(lines)) + " lines\n")
            except Exception, e:
                sys.stderr.write("Trouble with appender: " + str(e.message) + "\n")
开发者ID:klahnakoski,项目名称:intermittents,代码行数:38,代码来源:log_usingThreadedStream.py


示例20: write

 def write(self, template, params):
     with self.file_lock:
         self.file.append(expand_template(template, params))
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:3,代码来源:text_logs.py



注:本文中的pyLibrary.strings.expand_template函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python strings.utf82unicode函数代码示例发布时间:2022-05-25
下一篇:
Python jx.sort函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap