• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python strings.expand_template函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mo_logs.strings.expand_template函数的典型用法代码示例。如果您正苦于以下问题:Python expand_template函数的具体用法?Python expand_template怎么用?Python expand_template使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了expand_template函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: assertAlmostEqualValue

def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, delta=None):
    """
    Snagged from unittest/case.py, then modified (Aug2014)
    """
    if expected is NULL:
        if test == None:  # pandas dataframes reject any comparision with an exception!
            return
        else:
            raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))

    if expected == None:  # None has no expectations
        return
    if test == expected:
        # shortcut
        return

    if not is_number(expected):
        # SOME SPECIAL CASES, EXPECTING EMPTY CONTAINERS IS THE SAME AS EXPECTING NULL
        if is_list(expected) and len(expected) == 0 and test == None:
            return
        if is_data(expected) and not expected.keys() and test == None:
            return
        if test != expected:
            raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))
        return

    num_param = 0
    if digits != None:
        num_param += 1
    if places != None:
        num_param += 1
    if delta != None:
        num_param += 1
    if num_param>1:
        raise TypeError("specify only one of digits, places or delta")

    if digits is not None:
        with suppress_exception:
            diff = log10(abs(test-expected))
            if diff < digits:
                return

        standardMsg = expand_template("{{test}} != {{expected}} within {{digits}} decimal places", locals())
    elif delta is not None:
        if abs(test - expected) <= delta:
            return

        standardMsg = expand_template("{{test}} != {{expected}} within {{delta}} delta", locals())
    else:
        if places is None:
            places = 15

        with suppress_exception:
            diff = mo_math.log10(abs(test-expected))
            if diff < mo_math.ceiling(mo_math.log10(abs(test)))-places:
                return

        standardMsg = expand_template("{{test|json}} != {{expected|json}} within {{places}} places", locals())

    raise AssertionError(coalesce(msg, "") + ": (" + standardMsg + ")")
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:60,代码来源:fuzzytestcase.py


示例2: _setup_grcov

    def _setup_grcov(self):
        sudo("apt-get install -y gcc")

        response = http.get_json("https://api.github.com/repos/marco-c/grcov/releases/latest")
        with cd("~/ActiveData-ETL"):
            for asset in response.assets:
                if self.settings.grcov.platform in asset.browser_download_url:
                    run("wget "+asset.browser_download_url)
                    run(expand_template("tar xf grcov-{{platform}}.tar.bz2", self.settings.grcov))
                    run(expand_template("rm grcov-{{platform}}.tar.bz2", self.settings.grcov))
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:10,代码来源:etl.py


示例3: _aggop

    def _aggop(self, query):
        """
        SINGLE ROW RETURNED WITH AGGREGATES
        """
        if isinstance(query.select, list):
            # RETURN SINGLE OBJECT WITH AGGREGATES
            for s in query.select:
                if s.aggregate not in aggregates:
                    Log.error("Expecting all columns to have an aggregate: {{select}}", select=s)

            selects = FlatList()
            for s in query.select:
                selects.append(sql_alias(aggregates[s.aggregate].replace("{{code}}", s.value),quote_column(s.name)))

            sql = expand_template("""
                SELECT
                    {{selects}}
                FROM
                    {{table}}
                {{where}}
            """, {
                "selects": SQL(",\n".join(selects)),
                "table": self._subquery(query["from"])[0],
                "where": self._where2sql(query.filter)
            })

            return sql, lambda sql: self.db.column(sql)[0]  # RETURNING SINGLE OBJECT WITH AGGREGATE VALUES
        else:
            # RETURN SINGLE VALUE
            s0 = query.select
            if s0.aggregate not in aggregates:
                Log.error("Expecting all columns to have an aggregate: {{select}}", select=s0)

            select = sql_alias(aggregates[s0.aggregate].replace("{{code}}", s0.value) , quote_column(s0.name))

            sql = expand_template("""
                SELECT
                    {{selects}}
                FROM
                    {{table}}
                {{where}}
            """, {
                "selects": SQL(select),
                "table": self._subquery(query["from"])[0],
                "where": self._where2sql(query.where)
            })

            def post(sql):
                result = self.db.column_query(sql)
                return result[0][0]

            return sql, post  # RETURN SINGLE VALUE
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:52,代码来源:__init__.py


示例4: forall

    def forall(self, sql, param=None, _execute=None):
        assert _execute
        num = 0

        self._execute_backlog()
        try:
            old_cursor = self.cursor
            if not old_cursor:  # ALLOW NON-TRANSACTIONAL READS
                self.cursor = self.db.cursor()

            if param:
                sql = expand_template(sql, quote_param(param))
            sql = self.preamble + outdent(sql)
            self.debug and Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))
            self.cursor.execute(sql)

            columns = tuple([utf8_to_unicode(d[0]) for d in self.cursor.description])
            for r in self.cursor:
                num += 1
                _execute(wrap(dict(zip(columns, [utf8_to_unicode(c) for c in r]))))

            if not old_cursor:  # CLEANUP AFTER NON-TRANSACTIONAL READS
                self.cursor.close()
                self.cursor = None

        except Exception as e:
            Log.error("Problem executing SQL:\n{{sql|indent}}", sql=sql, cause=e, stack_depth=1)

        return num
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:29,代码来源:mysql.py


示例5: execute

    def execute(
        self,
        command,
        param=None,
        retry=True     # IF command FAILS, JUST THROW ERROR
    ):
        if param:
            command = expand_template(command, self.quote_param(param))

        output = None
        done = False
        while not done:
            try:
                with self.locker:
                    if not self.connection:
                        self._connect()

                with Closer(self.connection.cursor()) as curs:
                    curs.execute(command)
                    if curs.rowcount >= 0:
                        output = curs.fetchall()
                self.connection.commit()
                done = True
            except Exception as e:
                with suppress_exception:
                    self.connection.rollback()
                    # TODO: FIGURE OUT WHY rollback() DOES NOT HELP
                    self.connection.close()
                self.connection = None
                self._connect()
                if not retry:
                    Log.error("Problem with command:\n{{command|indent}}",  command= command, cause=e)
        return output
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:33,代码来源:redshift.py


示例6: column_query

    def column_query(self, sql, param=None):
        """
        RETURN RESULTS IN [column][row_num] GRID
        """
        self._execute_backlog()
        try:
            old_cursor = self.cursor
            if not old_cursor:  # ALLOW NON-TRANSACTIONAL READS
                self.cursor = self.db.cursor()
                self.cursor.execute("SET TIME_ZONE='+00:00'")
                self.cursor.close()
                self.cursor = self.db.cursor()

            if param:
                sql = expand_template(sql, quote_param(param))
            sql = self.preamble + outdent(sql)
            self.debug and Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))

            self.cursor.execute(sql)
            grid = [[utf8_to_unicode(c) for c in row] for row in self.cursor]
            # columns = [utf8_to_unicode(d[0]) for d in coalesce(self.cursor.description, [])]
            result = transpose(*grid)

            if not old_cursor:  # CLEANUP AFTER NON-TRANSACTIONAL READS
                self.cursor.close()
                self.cursor = None

            return result
        except Exception as e:
            if isinstance(e, InterfaceError) or e.message.find("InterfaceError") >= 0:
                Log.error("Did you close the db connection?", e)
            Log.error("Problem executing SQL:\n{{sql|indent}}", sql=sql, cause=e, stack_depth=1)
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:32,代码来源:mysql.py


示例7: write

 def write(self, template, params):
     try:
         with self.file_lock:
             self.file.append(expand_template(template, params))
     except Exception as e:
         Log.warning("Problem writing to file {{file}}, waiting...", file=file.name, cause=e)
         time.sleep(5)
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:7,代码来源:log_usingFile.py


示例8: _send_email

    def _send_email(self):
        try:
            if not self.accumulation:
                return
            with Emailer(self.settings) as emailer:
                # WHO ARE WE SENDING TO
                emails = Data()
                for template, params in self.accumulation:
                    content = expand_template(template, params)
                    emails[literal_field(self.settings.to_address)] += [content]
                    for c in self.cc:
                        if any(d in params.params.error for d in c.contains):
                            emails[literal_field(c.to_address)] += [content]

                # SEND TO EACH
                for to_address, content in emails.items():
                    emailer.send_email(
                        from_address=self.settings.from_address,
                        to_address=listwrap(to_address),
                        subject=self.settings.subject,
                        text_data="\n\n".join(content)
                    )

            self.accumulation = []
        except Exception as e:
            Log.warning("Could not send", e)
        finally:
            self.next_send = Date.now() + self.settings.average_interval * (2 * Random.float())
开发者ID:rv404674,项目名称:TUID,代码行数:28,代码来源:log_usingEmail.py


示例9: query

    def query(self, sql, param=None, stream=False, row_tuples=False):
        """
        RETURN LIST OF dicts
        """
        if not self.cursor:  # ALLOW NON-TRANSACTIONAL READS
            Log.error("must perform all queries inside a transaction")
        self._execute_backlog()

        try:
            if param:
                sql = expand_template(sql, quote_param(param))
            sql = self.preamble + outdent(sql)
            self.debug and Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))

            self.cursor.execute(sql)
            if row_tuples:
                if stream:
                    result = self.cursor
                else:
                    result = wrap(list(self.cursor))
            else:
                columns = [utf8_to_unicode(d[0]) for d in coalesce(self.cursor.description, [])]
                if stream:
                    result = (wrap({c: utf8_to_unicode(v) for c, v in zip(columns, row)}) for row in self.cursor)
                else:
                    result = wrap([{c: utf8_to_unicode(v) for c, v in zip(columns, row)} for row in self.cursor])

            return result
        except Exception as e:
            e = Except.wrap(e)
            if "InterfaceError" in e:
                Log.error("Did you close the db connection?", e)
            Log.error("Problem executing SQL:\n{{sql|indent}}", sql=sql, cause=e, stack_depth=1)
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:33,代码来源:mysql.py


示例10: quote_value

 def quote_value(self, value):
     """
     convert values to mysql code for the same
     mostly delegate directly to the mysql lib, but some exceptions exist
     """
     try:
         if value == None:
             return SQL("NULL")
         elif isinstance(value, SQL):
             if not value.param:
                 # value.template CAN BE MORE THAN A TEMPLATE STRING
                 return self.quote_sql(value.template)
             param = {k: self.quote_sql(v) for k, v in value.param.items()}
             return SQL(expand_template(value.template, param))
         elif isinstance(value, basestring):
             return SQL(self.db.literal(value))
         elif isinstance(value, Mapping):
             return SQL(self.db.literal(json_encode(value)))
         elif Math.is_number(value):
             return SQL(unicode(value))
         elif isinstance(value, datetime):
             return SQL("str_to_date('" + value.strftime("%Y%m%d%H%M%S.%f") + "', '%Y%m%d%H%i%s.%f')")
         elif isinstance(value, Date):
             return SQL("str_to_date('"+value.format("%Y%m%d%H%M%S.%f")+"', '%Y%m%d%H%i%s.%f')")
         elif hasattr(value, '__iter__'):
             return SQL(self.db.literal(json_encode(value)))
         else:
             return self.db.literal(value)
     except Exception as e:
         Log.error("problem quoting SQL", e)
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:30,代码来源:mysql.py


示例11: _send_email

    def _send_email(self):
        try:
            if not self.accumulation:
                return
            with Closer(connect_to_region(
                self.settings.region,
                aws_access_key_id=unwrap(self.settings.aws_access_key_id),
                aws_secret_access_key=unwrap(self.settings.aws_secret_access_key)
            )) as conn:

                # WHO ARE WE SENDING TO
                emails = Data()
                for template, params in self.accumulation:
                    content = expand_template(template, params)
                    emails[literal_field(self.settings.to_address)] += [content]
                    for c in self.cc:
                        if any(c in params.params.error for c in c.contains):
                            emails[literal_field(c.to_address)] += [content]

                # SEND TO EACH
                for to_address, content in emails.items():
                    conn.send_email(
                        source=self.settings.from_address,
                        to_addresses=listwrap(to_address),
                        subject=self.settings.subject,
                        body="\n\n".join(content),
                        format="text"
                    )

            self.next_send = Date.now() + self.settings.max_interval
            self.accumulation = []
        except Exception as e:
            self.next_send = Date.now() + self.settings.max_interval
            Log.warning("Could not send", e)
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:34,代码来源:log_usingSES.py


示例12: write

 def write(self, template, params):
     value = expand_template(template, params)
     self.locker.acquire()
     try:
         self.writer(value + CR)
     finally:
         self.locker.release()
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:7,代码来源:log_usingStream.py


示例13: to_es_script

    def to_es_script(self, schema, not_null=False, boolean=False, many=True):
        term = FirstOp(self.term).partial_eval()
        value = term.to_es_script(schema)

        if is_op(value.frum, CoalesceOp_):
            return CoalesceOp(
                [StringOp(t).partial_eval() for t in value.frum.terms]
            ).to_es_script(schema)

        if value.miss is TRUE or value.type is IS_NULL:
            return empty_string_script
        elif value.type == BOOLEAN:
            return EsScript(
                miss=self.term.missing().partial_eval(),
                type=STRING,
                expr=value.expr + ' ? "T" : "F"',
                frum=self,
                schema=schema,
            )
        elif value.type == INTEGER:
            return EsScript(
                miss=self.term.missing().partial_eval(),
                type=STRING,
                expr="String.valueOf(" + value.expr + ")",
                frum=self,
                schema=schema,
            )
        elif value.type == NUMBER:
            return EsScript(
                miss=self.term.missing().partial_eval(),
                type=STRING,
                expr=expand_template(NUMBER_TO_STRING, {"expr": value.expr}),
                frum=self,
                schema=schema,
            )
        elif value.type == STRING:
            return value
        else:
            return EsScript(
                miss=self.term.missing().partial_eval(),
                type=STRING,
                expr=expand_template(NUMBER_TO_STRING, {"expr": value.expr}),
                frum=self,
                schema=schema,
            )
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:45,代码来源:painless.py


示例14: execute

    def execute(self, sql, param=None):
        if self.transaction_level == 0:
            Log.error("Expecting transaction to be started before issuing queries")

        if param:
            sql = expand_template(sql, quote_param(param))
        sql = outdent(sql)
        self.backlog.append(sql)
        if self.debug or len(self.backlog) >= MAX_BATCH_SIZE:
            self._execute_backlog()
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:10,代码来源:mysql.py


示例15: append_query

    def append_query(self, es_query, start):
        self.start = start

        es_field = self.query.frum.schema.leaves(self.var)[0].es_column
        es_query = wrap({"aggs": {
            "_match": set_default({"terms": {
                "script":  expand_template(LIST_TO_PIPE, {"expr": 'doc[' + quote(es_field) + '].values'})
            }}, es_query)
        }})

        return es_query
开发者ID:rv404674,项目名称:TUID,代码行数:11,代码来源:decoders.py


示例16: table2csv

def table2csv(table_data):
    """
    :param table_data: expecting a list of tuples
    :return: text in nice formatted csv
    """
    text_data = [tuple(value2json(vals, pretty=True) for vals in rows) for rows in table_data]

    col_widths = [max(len(text) for text in cols) for cols in zip(*text_data)]
    template = ", ".join(
        "{{" + unicode(i) + "|left_align(" + unicode(w) + ")}}"
        for i, w in enumerate(col_widths)
    )
    text = "\n".join(expand_template(template, d) for d in text_data)
    return text
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:14,代码来源:convert.py


示例17: execute_sql

def execute_sql(
    host,
    username,
    password,
    sql,
    schema=None,
    param=None,
    kwargs=None
):
    """EXECUTE MANY LINES OF SQL (FROM SQLDUMP FILE, MAYBE?"""
    kwargs.schema = coalesce(kwargs.schema, kwargs.database)

    if param:
        with MySQL(kwargs) as temp:
            sql = expand_template(sql, quote_param(param))

    # We have no way to execute an entire SQL file in bulk, so we
    # have to shell out to the commandline client.
    args = [
        "mysql",
        "-h{0}".format(host),
        "-u{0}".format(username),
        "-p{0}".format(password)
    ]
    if schema:
        args.append("{0}".format(schema))

    try:
        proc = subprocess.Popen(
            args,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            bufsize=-1
        )
        if is_text(sql):
            sql = sql.encode("utf8")
        (output, _) = proc.communicate(sql)
    except Exception as e:
        raise Log.error("Can not call \"mysql\"", e)

    if proc.returncode:
        if len(sql) > 10000:
            sql = "<" + text_type(len(sql)) + " bytes of sql>"
        Log.error(
            "Unable to execute sql: return code {{return_code}}, {{output}}:\n {{sql}}\n",
            sql=indent(sql),
            return_code=proc.returncode,
            output=output
        )
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:50,代码来源:mysql.py


示例18: inner

        def inner(changeset_id):
            if self.es.cluster.version.startswith("1.7."):
                query = {
                    "query": {"filtered": {
                        "query": {"match_all": {}},
                        "filter": {"and": [
                            {"prefix": {"changeset.id": changeset_id}},
                            {"range": {"etl.timestamp": {"gt": MIN_ETL_AGE}}}
                        ]}
                    }},
                    "size": 1
                }
            else:
                query = {
                    "query": {"bool": {"must": [
                        {"prefix": {"changeset.id": changeset_id}},
                        {"range": {"etl.timestamp": {"gt": MIN_ETL_AGE}}}
                    ]}},
                    "size": 1
                }

            try:
                # ALWAYS TRY ES FIRST
                with self.es_locker:
                    response = self.es.search(query)
                    json_diff = response.hits.hits[0]._source.changeset.diff
                if json_diff:
                    return json_diff
            except Exception as e:
                pass

            url = expand_template(DIFF_URL, {"location": revision.branch.url, "rev": changeset_id})
            DEBUG and Log.note("get unified diff from {{url}}", url=url)
            try:
                response = http.get(url)
                diff = response.content.decode("utf8")
                json_diff = diff_to_json(diff)
                num_changes = _count(c for f in json_diff for c in f.changes)
                if json_diff:
                    if revision.changeset.description.startswith("merge "):
                        return None  # IGNORE THE MERGE CHANGESETS
                    elif num_changes < MAX_DIFF_SIZE:
                        return json_diff
                    else:
                        Log.warning("Revision at {{url}} has a diff with {{num}} changes, ignored", url=url, num=num_changes)
                        for file in json_diff:
                            file.changes = None
                        return json_diff
            except Exception as e:
                Log.warning("could not get unified diff from {{url}}", url=url, cause=e)
开发者ID:rv404674,项目名称:TUID,代码行数:50,代码来源:hg_mozilla_org.py


示例19: to_es_script

def to_es_script(self, schema):
    term = FirstOp("first", self.term).partial_eval()
    value = term.to_es_script(schema)

    if isinstance(value.frum, CoalesceOp):
        return CoalesceOp("coalesce", [StringOp("string", t).partial_eval() for t in value.frum.terms]).to_es_script(schema)

    if value.type == BOOLEAN:
        return EsScript(
            miss=self.term.missing().partial_eval(),
            type=STRING,
            expr=value.expr + ' ? "T" : "F"',
            frum=self
        )
    elif value.type == INTEGER:
        return EsScript(
            miss=self.term.missing().partial_eval(),
            type=STRING,
            expr="String.valueOf(" + value.expr + ")",
            frum=self
        )
    elif value.type == NUMBER:
        return EsScript(
            miss=self.term.missing().partial_eval(),
            type=STRING,
            expr=expand_template(TO_STRING, {"expr":value.expr}),
            frum=self
        )
    elif value.type == STRING:
        return value
    else:
        return EsScript(
            miss=self.term.missing().partial_eval(),
            type=STRING,
            expr=expand_template(TO_STRING, {"expr":value.expr}),
            frum=self
        )
开发者ID:rv404674,项目名称:TUID,代码行数:37,代码来源:expressions.py


示例20: compileString2Term

def compileString2Term(edge):
    if edge.esscript:
        Log.error("edge script not supported yet")

    value = edge.value
    if is_variable_name(value):
        value = strings.expand_template("getDocValue({{path}})", {"path": quote(value)})
    else:
        Log.error("not handled")

    def fromTerm(value):
        return edge.domain.getPartByKey(value)

    return Data(
        toTerm={"head": "", "body": value},
        fromTerm=fromTerm
    )
开发者ID:rv404674,项目名称:TUID,代码行数:17,代码来源:util.py



注:本文中的mo_logs.strings.expand_template函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python mo_math.Math类代码示例发布时间:2022-05-27
下一篇:
Python mo_logs.Log类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap