• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python textutils.json_decode函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中textutils.json_decode函数的典型用法代码示例。如果您正苦于以下问题:Python json_decode函数的具体用法?Python json_decode怎么用?Python json_decode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了json_decode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __call__

    def __call__(self, req, db, user):
        from operation.typechecker import TypeCheckerContext

        if user.isAnonymous() and not self.__accept_anonymous_user:
            return OperationFailureMustLogin()

        if req.method == "POST": data = req.read()
        else: data = req.getParameter("data")

        if not data: raise OperationError("no input")

        try: value = json_decode(data)
        except ValueError as error: raise OperationError("invalid input: %s" % str(error))

        try:
            self.__checker(value, TypeCheckerContext(req, db, user))
            return self.process(db, user, **value)
        except OperationError as error:
            return error
        except OperationFailure as failure:
            return failure
        except dbutils.NoSuchUser as error:
            return OperationFailure(code="nosuchuser",
                                    title="Who is '%s'?" % error.name,
                                    message="There is no user in Critic's database named that.")
        except dbutils.NoSuchReview as error:
            return OperationFailure(code="nosuchreview",
                                    title="Invalid review ID",
                                    message="The review ID r/%d is not valid." % error.id)
        except dbutils.TransactionRollbackError:
            return OperationFailure(code="transactionrollback",
                                    title="Transaction rolled back",
                                    message="Your database transaction rolled back, probably due to a deadlock.  Please try again.")
        except:
            # Decode value again since the type checkers might have modified it.
            value = json_decode(data)

            error_message = ("User: %s\nReferrer: %s\nData: %s\n\n%s"
                             % (user.name,
                                req.getReferrer(),
                                json_encode(self.sanitize(value), indent=2),
                                traceback.format_exc()))

            db.rollback()

            import mailutils
            import configuration

            if not user.hasRole(db, "developer"):
                mailutils.sendExceptionMessage(db, "wsgi[%s]" % req.path, error_message)

            if configuration.debug.IS_DEVELOPMENT or user.hasRole(db, "developer"):
                return OperationError(error_message)
            else:
                return OperationError("An unexpected error occurred.  " +
                                      "A message has been sent to the system administrator(s) " +
                                      "with details about the problem.")
开发者ID:andreastt,项目名称:critic,代码行数:57,代码来源:__init__.py


示例2: handle_input

        def handle_input(self, data):
            try:
                result = json_decode(data)
            except ValueError:
                result = { "status": "error",
                           "error": ("invalid response:\n" +
                                     background.utils.indent(data)) }
            if result["status"] == "ok":
                for item in result["info"]:
                    self.server.info(item)
                if result["output"]:
                    self.__client.write(result["output"].strip() + "\n")
                if result["accept"]:
                    self.__client.write("ok\n")
            elif result["status"] == "reject":
                self.server.warning(result["message"])
                self.__client.write(result["message"].strip() + "\n")
            else:
                self.server.error(result["error"])
                self.__client.write("""\
An exception was raised while processing the request.  A message has
been sent to the system administrator(s).
""")
                if configuration.debug.IS_DEVELOPMENT:
                    self.__client.write("\n" + result["error"].strip() + "\n")
            self.__client.close()
开发者ID:Haster2004,项目名称:critic,代码行数:26,代码来源:githook.py


示例3: process

    def process(self, db, user, service_name):
        if not user.hasRole(db, "administrator"):
            raise OperationFailure(
                code="notallowed", title="Not allowed!", message="Only a system administrator can restart services."
            )

        if service_name == "wsgi":
            for pid in os.listdir(configuration.paths.WSGI_PIDFILE_DIR):
                try:
                    os.kill(int(pid), signal.SIGINT)
                except:
                    pass
            return OperationResult()
        else:
            connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            connection.connect(configuration.services.SERVICEMANAGER["address"])
            connection.send(textutils.json_encode({"command": "restart", "service": service_name}))
            connection.shutdown(socket.SHUT_WR)

            data = ""
            while True:
                received = connection.recv(4096)
                if not received:
                    break
                data += received

            result = textutils.json_decode(data)

            if result["status"] == "ok":
                return OperationResult()
            else:
                raise OperationError, result["error"]
开发者ID:kolorowestudio,项目名称:critic,代码行数:32,代码来源:servicemanager.py


示例4: handle_input

 def handle_input(self, value):
     try: result = json_decode(value)
     except ValueError:
         self.server.error("invalid response:\n" + indent(value))
         result = self.request.copy()
         result["error"] = value
     for client in self.clients: client.add_result(result)
     self.server.request_finished(self, self.request, result)
开发者ID:jherland,项目名称:critic,代码行数:8,代码来源:utils.py


示例5: perform_job

    def perform_job():
        import syntaxhighlight.generate

        request = json_decode(sys.stdin.read())
        request["highlighted"] = syntaxhighlight.generate.generateHighlight(
            repository_path=request["repository_path"],
            sha1=request["sha1"],
            language=request["language"])
        sys.stdout.write(json_encode(request))
开发者ID:ahockersten,项目名称:critic,代码行数:9,代码来源:highlight.py


示例6: handle_input

        def handle_input(self, _file, data):
            data = textutils.json_decode(data)

            process = self.server.get_process(data["flavor"])

            extension = ExtensionRunner.Extension(
                self.server, self, process, data["timeout"])
            extension.write(data["stdin"])
            extension.close()

            self.server.add_peer(extension)
开发者ID:jensl,项目名称:critic,代码行数:11,代码来源:extensionrunner.py


示例7: handle_input

            def handle_input(self, data):
                try: data = json_decode(data)
                except ValueError:
                    self.server.error("invalid response from wait-for-update child: %r" % data)
                    self.client.close()

                if data["status"] == "output":
                    self.client.write(data["output"])
                    self.server.debug("  hook output written to client")
                elif data["status"] == "no-output":
                    self.server.debug("  update produced no hook output")
                else:
                    self.server.debug("  timeout")

                self.client.close()
开发者ID:Aessy,项目名称:critic,代码行数:15,代码来源:branchtrackerhook.py


示例8: __call__

    def __call__(self, req, db, user):
        if user.isAnonymous() and not self.__accept_anonymous_user:
            return OperationFailureMustLogin()

        if req.method == "POST": data = req.read()
        else: data = req.getParameter("data")

        if not data: raise OperationError, "no input"

        try: value = json_decode(data)
        except ValueError, error: raise OperationError, "invalid input: %s" % str(error)

        self.__checker(value)

        try: return self.process(db, user, **value)
        except OperationError: raise
        except OperationFailure, failure: return failure
        except dbutils.NoSuchUser, error:
            return OperationFailure(code="nosuchuser",
                                    title="Who is '%s'?" % error.name,
                                    message="There is no user in Critic's database named that.")
开发者ID:suquant,项目名称:critic,代码行数:21,代码来源:__init__.py


示例9: requestChangesets

def requestChangesets(requests):
    try:
        connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        connection.connect(configuration.services.CHANGESET["address"])
        connection.send(json_encode(requests))
        connection.shutdown(socket.SHUT_WR)

        data = ""

        while True:
            received = connection.recv(4096)
            if not received: break
            data += received

        connection.close()
    except socket.error as error:
        raise ChangesetBackgroundServiceError(error[1])

    try:
        results = json_decode(data)
    except ValueError:
        raise ChangesetBackgroundServiceError(
            "returned an invalid response: %r" % data)

    if type(results) != list:
        # If not a list, the result is probably an error message.
        raise ChangesetBackgroundServiceError(str(results))

    if len(results) != len(requests):
        raise ChangesetBackgroundServiceError("didn't process all requests")

    errors = []

    for result in results:
        if "error" in result:
            errors.append(result["error"])

    if errors:
        raise ChangesetBackgroundServiceError(
            "one or more requests failed:\n%s" % "\n".join(map(indent, errors)))
开发者ID:Aessy,项目名称:critic,代码行数:40,代码来源:client.py


示例10: requestHighlights

def requestHighlights(repository, sha1s):
    requests = [{ "repository_path": repository.path, "sha1": sha1, "path": path, "language": language }
                for sha1, (path, language) in sha1s.items()
                if not syntaxhighlight.isHighlighted(sha1, language)]

    if not requests: return

    try:
        connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        connection.connect(configuration.services.HIGHLIGHT["address"])
        connection.send(json_encode(requests))
        connection.shutdown(socket.SHUT_WR)

        data = ""

        while True:
            received = connection.recv(4096)
            if not received: break
            data += received

        connection.close()
    except socket.error as error:
        raise HighlightBackgroundServiceError(error[1])

    try:
        results = json_decode(data)
    except ValueError:
        raise HighlightBackgroundServiceError(
            "returned an invalid response (%r)" % data)

    if type(results) != list:
        # If not a list, the result is probably an error message.
        raise HighlightBackgroundServiceError(str(results))

    if len(results) != len(requests):
        raise HighlightBackgroundServiceError("didn't process all requests")
开发者ID:Aessy,项目名称:critic,代码行数:36,代码来源:request.py


示例11: perform_job

    def perform_job():
        soft_limit, hard_limit = getrlimit(RLIMIT_RSS)
        rss_limit = configuration.services.CHANGESET["rss_limit"]
        if soft_limit < rss_limit:
            setrlimit(RLIMIT_RSS, (rss_limit, hard_limit))

        from changeset.create import createChangeset

        request = json_decode(sys.stdin.read())

        try:
            db = dbutils.Database()

            createChangeset(db, request)

            db.close()

            sys.stdout.write(json_encode(request))
        except:
            print "Request:"
            print json_encode(request, indent=2)
            print

            print_exc(file=sys.stdout)
开发者ID:Aessy,项目名称:critic,代码行数:24,代码来源:changeset.py


示例12: str

    except gitutils.GitReferenceError:
        return "invalid commit id"
    except Exception as exception:
        return str(exception)

HANDLERS = { "propagate-comment": propagateComment }

try:
    if len(sys.argv) > 1:
        init()

        for command in sys.argv[1:]:
            pending_mails = None

            if command == "generate-mails-for-batch":
                data = json_decode(sys.stdin.readline())
                batch_id = data["batch_id"]
                was_accepted = data["was_accepted"]
                is_accepted = data["is_accepted"]
                pending_mails = reviewing.utils.generateMailsForBatch(db, batch_id, was_accepted, is_accepted)
            elif command == "generate-mails-for-assignments-transaction":
                data = json_decode(sys.stdin.readline())
                transaction_id = data["transaction_id"]
                pending_mails = reviewing.utils.generateMailsForAssignmentsTransaction(db, transaction_id)
            elif command == "apply-filters":
                data = json_decode(sys.stdin.readline())
                filters = reviewing.filters.Filters()
                user = dbutils.User.fromId(db, data["user_id"]) if "user_id" in data else None
                if "review_id" in data:
                    review = dbutils.Review.fromId(db, data["review_id"])
                    filters.setFiles(db, review=review)
开发者ID:Aessy,项目名称:critic,代码行数:31,代码来源:cli.py


示例13: renderCreateReview

def renderCreateReview(req, db, user):
    if user.isAnonymous(): raise page.utils.NeedLogin(req)

    repository = req.getParameter("repository", filter=gitutils.Repository.FromParameter(db), default=None)
    applyparentfilters = req.getParameter("applyparentfilters", "yes" if user.getPreference(db, 'review.applyUpstreamFilters') else "no") == "yes"

    cursor = db.cursor()

    if req.method == "POST":
        data = json_decode(req.read())

        summary = data.get("summary")
        description = data.get("description")
        review_branch_name = data.get("review_branch_name")
        commit_ids = data.get("commit_ids")
        commit_sha1s = data.get("commit_sha1s")
    else:
        summary = req.getParameter("summary", None)
        description = req.getParameter("description", None)
        review_branch_name = req.getParameter("reviewbranchname", None)

        commit_ids = None
        commit_sha1s = None

        commits_arg = req.getParameter("commits", None)
        remote = req.getParameter("remote", None)
        upstream = req.getParameter("upstream", "master")
        branch_name = req.getParameter("branch", None)

        if commits_arg:
            try: commit_ids = map(int, commits_arg.split(","))
            except: commit_sha1s = [repository.revparse(ref) for ref in commits_arg.split(",")]
        elif branch_name:
            cursor.execute("""SELECT commit
                                FROM reachable
                                JOIN branches ON (branch=id)
                               WHERE repository=%s
                                 AND name=%s""",
                           (repository.id, branch_name))
            commit_ids = [commit_id for (commit_id,) in cursor]

            if len(commit_ids) > configuration.limits.MAXIMUM_REVIEW_COMMITS:
                raise page.utils.DisplayMessage(
                    "Too many commits!",
                    (("<p>The branch <code>%s</code> contains %d commits.  Reviews can"
                      "be created from branches that contain at most %d commits.</p>"
                      "<p>This limit can be adjusted by modifying the system setting"
                      "<code>configuration.limits.MAXIMUM_REVIEW_COMMITS</code>.</p>")
                     % (htmlutils.htmlify(branch_name), len(commit_ids),
                        configuration.limits.MAXIMUM_REVIEW_COMMITS)),
                    html=True)
        else:
            return renderSelectSource(req, db, user)

    req.content_type = "text/html; charset=utf-8"

    if commit_ids:
        commits = [gitutils.Commit.fromId(db, repository, commit_id) for commit_id in commit_ids]
    elif commit_sha1s:
        commits = [gitutils.Commit.fromSHA1(db, repository, commit_sha1) for commit_sha1 in commit_sha1s]
    else:
        commits = []

    if not commit_ids:
        commit_ids = [commit.getId(db) for commit in commits]
    if not commit_sha1s:
        commit_sha1s = [commit.sha1 for commit in commits]

    if summary is None:
        if len(commits) == 1:
            summary = commits[0].summary()
        else:
            summary = ""

    if review_branch_name:
        invalid_branch_name = "false"
        default_branch_name = review_branch_name
    else:
        invalid_branch_name = htmlutils.jsify(user.name + "/")
        default_branch_name = user.name + "/"

        match = re.search("(?:^|[Ff]ix(?:e[ds])?(?: +for)?(?: +bug)? +)([A-Z][A-Z0-9]+-[0-9]+)", summary)
        if match:
            invalid_branch_name = "false"
            default_branch_name = htmlutils.htmlify(match.group(1))

    changesets = []
    changeset_utils.createChangesets(db, repository, commits)
    for commit in commits:
        changesets.extend(changeset_utils.createChangeset(db, None, repository, commit, do_highlight=False))
    changeset_ids = [changeset.id for changeset in changesets]

    all_reviewers, all_watchers = reviewing.utils.getReviewersAndWatchers(
        db, repository, changesets=changesets, applyparentfilters=applyparentfilters)

    document = htmlutils.Document(req)
    html = document.html()
    head = html.head()

    document.addInternalScript(user.getJS(db))
#.........这里部分代码省略.........
开发者ID:ahockersten,项目名称:critic,代码行数:101,代码来源:createreview.py


示例14: render

                delay += delay
            else: raise

    if not connected:
        raise page.utils.DisplayMessage, "Service manager not responding!"

    connection.send(textutils.json_encode({ "query": "status" }))
    connection.shutdown(socket.SHUT_WR)

    data = ""
    while True:
        received = connection.recv(4096)
        if not received: break
        data += received

    result = textutils.json_decode(data)

    if result["status"] == "error":
        raise page.utils.DisplayMessage, result["error"]

    paleyellow = page.utils.PaleYellowTable(body, "Services")

    def render(target):
        table = target.table("services", cellspacing=0, align="center")

        headings = table.tr("headings")
        headings.th("name").text("Name")
        headings.th("module").text("Module")
        headings.th("pid").text("PID")
        headings.th("rss").text("RSS")
        headings.th("cpu").text("CPU")
开发者ID:ryfow,项目名称:critic,代码行数:31,代码来源:services.py


示例15: getrlimit

from dbutils import Database

if "--json-job" in sys.argv[1:]:
    from resource import getrlimit, setrlimit, RLIMIT_RSS
    from traceback import print_exc

    soft_limit, hard_limit = getrlimit(RLIMIT_RSS)
    rss_limit = configuration.services.CHANGESET["rss_limit"]
    if soft_limit < rss_limit:
        setrlimit(RLIMIT_RSS, (rss_limit, hard_limit))

    from changeset.create import createChangeset
    from textutils import json_decode, json_encode

    request = json_decode(sys.stdin.read())

    try:
        db = Database()

        createChangeset(db, request)

        db.close()

        sys.stdout.write(json_encode(request))
    except:
        print "Request:"
        print json_encode(request, indent=2)
        print

        print_exc(file=sys.stdout)
开发者ID:KurSh,项目名称:critic,代码行数:30,代码来源:changeset.py


示例16: executeProcess

def executeProcess(db, manifest, role_name, script, function, extension_id,
                   user_id, argv, timeout, stdin=None, rlimit_rss=256):
    # If |user_id| is not the same as |db.user|, then one user's access of the
    # system is triggering an extension on behalf of another user.  This will
    # for instance happen when one user is adding changes to a review,
    # triggering an extension filter hook set up by another user.
    #
    # In this case, we need to check that the other user can access the
    # extension.
    #
    # If |user_id| is the same as |db.user|, we need to use |db.profiles|, which
    # may contain a profile associated with an access token that was used to
    # authenticate the user.
    if user_id != db.user.id:
        user = dbutils.User.fromId(db, user_id)
        authentication_labels = auth.DATABASE.getAuthenticationLabels(user)
        profiles = [auth.AccessControlProfile.forUser(
            db, user, authentication_labels)]
    else:
        authentication_labels = db.authentication_labels
        profiles = db.profiles

    extension = Extension.fromId(db, extension_id)
    if not auth.AccessControlProfile.isAllowedExtension(
            profiles, "execute", extension):
        raise auth.AccessDenied("Access denied to extension: execute %s"
                                % extension.getKey())

    flavor = manifest.flavor

    if manifest.flavor not in configuration.extensions.FLAVORS:
        flavor = configuration.extensions.DEFAULT_FLAVOR

    stdin_data = "%s\n" % json_encode({
            "library_path": configuration.extensions.FLAVORS[flavor]["library"],
            "rlimit": { "rss": rlimit_rss },
            "hostname": configuration.base.HOSTNAME,
            "dbname": configuration.database.PARAMETERS["database"],
            "dbuser": configuration.database.PARAMETERS["user"],
            "git": configuration.executables.GIT,
            "python": configuration.executables.PYTHON,
            "python_path": "%s:%s" % (configuration.paths.CONFIG_DIR,
                                      configuration.paths.INSTALL_DIR),
            "repository_work_copy_path": configuration.extensions.WORKCOPY_DIR,
            "changeset_address": configuration.services.CHANGESET["address"],
            "branchtracker_pid_path": configuration.services.BRANCHTRACKER["pidfile_path"],
            "maildelivery_pid_path": configuration.services.MAILDELIVERY["pidfile_path"],
            "is_development": configuration.debug.IS_DEVELOPMENT,
            "extension_path": manifest.path,
            "extension_id": extension_id,
            "user_id": user_id,
            "authentication_labels": list(authentication_labels),
            "role": role_name,
            "script_path": script,
            "fn": function,
            "argv": argv })

    if stdin is not None:
        stdin_data += stdin

    # Double the timeout. Timeouts are primarily handled by the extension runner
    # service, which returns an error response on timeout. This deadline here is
    # thus mostly to catch the extension runner service itself timing out.
    deadline = time.time() + timeout * 2

    try:
        connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        connection.settimeout(max(0, deadline - time.time()))
        connection.connect(configuration.services.EXTENSIONRUNNER["address"])
        connection.sendall(json_encode({
            "stdin": stdin_data,
            "flavor": flavor,
            "timeout": timeout
        }))
        connection.shutdown(socket.SHUT_WR)

        data = ""

        while True:
            connection.settimeout(max(0, deadline - time.time()))
            try:
                received = connection.recv(4096)
            except socket.error as error:
                if error.errno == errno.EINTR:
                    continue
                raise
            if not received:
                break
            data += received

        connection.close()
    except socket.timeout as error:
        raise ProcessTimeout(timeout)
    except socket.error as error:
        raise ProcessError("failed to read response: %s" % error)

    try:
        data = json_decode(data)
    except ValueError as error:
        raise ProcessError("failed to decode response: %s" % error)
#.........这里部分代码省略.........
开发者ID:jensl,项目名称:critic,代码行数:101,代码来源:execute.py


示例17: process

 def process(value):
     value = value.strip()
     if value[0] == '"' == value[-1]:
         return json_decode(value)
     else:
         return value
开发者ID:andreastt,项目名称:critic,代码行数:6,代码来源:manifest.py


示例18: main

def main():
    parser = argparse.ArgumentParser()

    parser.add_argument("-u", dest="user_id", type=int)
    parser.add_argument("-l", dest="auth_labels", action="append", default=[])
    parser.add_argument("command", nargs="*")

    arguments = parser.parse_args()

    try:
        init(arguments.user_id, arguments.auth_labels)

        for command in arguments.command:
            pending_mails = None

            if command == "generate-mails-for-batch":
                data = json_decode(sys.stdin.readline())
                batch_id = data["batch_id"]
                was_accepted = data["was_accepted"]
                is_accepted = data["is_accepted"]
                pending_mails = reviewing.utils.generateMailsForBatch(db, batch_id, was_accepted, is_accepted)
            elif command == "generate-mails-for-assignments-transaction":
                data = json_decode(sys.stdin.readline())
                transaction_id = data["transaction_id"]
                pending_mails = reviewing.utils.generateMailsForAssignmentsTransaction(db, transaction_id)
            elif command == "apply-filters":
                data = json_decode(sys.stdin.readline())
                filters = reviewing.filters.Filters()
                user = dbutils.User.fromId(db, data["user_id"]) if "user_id" in data else None
                if "review_id" in data:
                    review = dbutils.Review.fromId(db, data["review_id"])
                    filters.setFiles(db, review=review)
                    filters.load(db, review=review, user=user,
                                 added_review_filters=data.get("added_review_filters", []),
                                 removed_review_filters=data.get("removed_review_filters", []))
                else:
                    repository = gitutils.Repository.fromId(db, data["repository_id"])
                    filters.setFiles(db, file_ids=data["file_ids"])
                    filters.load(db, repository=repository, recursive=data.get("recursive", False), user=user)
                sys.stdout.write(json_encode(filters.data) + "\n")
            elif command == "generate-custom-mails":
                pending_mails = []
                for data in json_decode(sys.stdin.readline()):
                    from_user = dbutils.User.fromId(db, data["sender"])
                    if data.get("recipients"):
                        recipients = [dbutils.User.fromId(db, user_id)
                                      for user_id in data["recipients"]]
                    else:
                        recipients = None
                    subject = data["subject"]
                    headers = data.get("headers")
                    body = data["body"]
                    if "review_id" in data:
                        review = dbutils.Review.fromId(db, data["review_id"])
                    else:
                        review = None
                    pending_mails.extend(sendCustomMail(
                        from_user, recipients, subject, headers, body, review))
            elif command == "set-review-state":
                data = json_decode(sys.stdin.readline())
                error = ""
                try:
                    user = dbutils.User.fromId(db, data["user_id"])
                    review = dbutils.Review.fromId(db, data["review_id"])
                    if review.state != data["old_state"]:
                        error = "invalid old state"
                    elif data["new_state"] == "open":
                        review.reopen(db, user)
                    elif data["new_state"] == "closed":
                        review.close(db, user)
                    elif data["new_state"] == "dropped":
                        review.drop(db, user)
                    else:
                        error = "invalid new state"
                except dbutils.NoSuchUser:
                    error = "invalid user id"
                except dbutils.NoSuchReview:
                    error = "invalid review id"
                except Exception as error:
                    error = str(error)
                sys.stdout.write(error + "\n")
            elif command in HANDLERS:
                data_in = json_decode(sys.stdin.readline())
                data_out = HANDLERS[command](data_in)
                sys.stdout.write(json_encode(data_out) + "\n")
            else:
                sys.stdout.write(json_encode("unknown command: %s" % command) + "\n")
                sys.exit(0)

            if pending_mails is not None:
                sys.stdout.write(json_encode(pending_mails) + "\n")

        finish()
    except Exception:
        sys.stdout.write(json_encode(traceback.format_exc()) + "\n")
    finally:
        abort()
开发者ID:jensl,项目名称:critic,代码行数:97,代码来源:cli.py


示例19: processLine

def processLine(paths, line):
    try:
        command, value = line.split(" ", 1)
    except ValueError:
        raise InjectError("Invalid line in output: %r" % line)

    if command not in ("link", "script", "stylesheet", "preference"):
        raise InjectError("Invalid command: %r" % command)

    value = value.strip()

    try:
        value = json_decode(value)
    except ValueError:
        raise InjectError("Invalid JSON: %r" % value)

    def is_string(value):
        return isinstance(value, basestring)

    if command in ("script", "stylesheet") and not is_string(value):
        raise InjectError("Invalid value for %r: %r (expected string)"
                          % (command, value))
    elif command == "link":
        if isinstance(value, dict):
            if "label" not in value or not is_string(value["label"]):
                raise InjectError("Invalid value for %r: %r (expected attribute 'label' of type string)"
                                  % (command, value))
            elif "url" not in value or not is_string(value["url"]) or value["url"] is None:
                raise InjectError("Invalid value for %r: %r (expected attribute 'url' of type string or null)"
                                  % (command, value))
        # Alternatively support [label, url] (backwards compatibility).
        elif not isinstance(value, list) or len(value) != 2:
            raise InjectError("Invalid value for %r: %r (expected object { \"label\": LABEL, \"url\": URL })"
                              % (command, value))
        elif not is_string(value[0]):
            raise InjectError("Invalid value for %r: %r (expected string at array[0])"
                              % (command, value))
        elif not (is_string(value[1]) or value[1] is None):
            raise InjectError("Invalid value for %r: %r (expected string or null at array[1])"
                              % (command, value))
        else:
            value = { "label": value[0], "url": value[1] }
    elif command == "preference":
        if "config" not in paths:
            raise InjectError("Invalid command: %r only valid on /config page"
                              % command)
        elif not isinstance(value, dict):
            raise InjectError("Invalid value for %r: %r (expected object)"
                              % (command, value))

        for name in ("url", "name", "type", "value", "default", "description"):
            if name not in value:
                raise InjectError("Invalid value for %r: %r (missing attribute %r)"
                                  % (command, value, name))

        preference_url = value["url"]
        preference_name = value["name"]
        preference_type = value["type"]
        preference_value = value["value"]
        preference_default = value["default"]
        preference_description = value["description"]

        if not is_string(preference_url):
            raise InjectError("Invalid value for %r: %r (expected attribute 'url' of type string)"
                              % (command, value))
        elif not is_string(preference_name):
            raise InjectError("Invalid value for %r: %r (expected attribute 'name' of type string)"
                              % (command, value))
        elif not is_string(preference_description):
            raise InjectError("Invalid value for %r: %r (expected attribute 'description' of type string)"
                              % (command, value))

        if is_string(preference_type):
            if preference_type not in ("boolean", "integer", "string"):
                raise InjectError("Invalid value for %r: %r (unsupported preference type)"
                                  % (command, value))

            if preference_type == "boolean":
                type_check = lambda value: isinstance(value, bool)
            elif preference_type == "integer":
                type_check = lambda value: isinstance(value, int)
            else:
                type_check = is_string

            if not type_check(preference_value):
                raise InjectError("Invalid value for %r: %r (type mismatch between 'value' and 'type')"
                                  % (command, value))

            if not type_check(preference_default):
                raise InjectError("Invalid value for %r: %r (type mismatch between 'default' and 'type')"
                                  % (command, value))
        else:
            if not isinstance(preference_type, list):
                raise InjectError("Invalid value for %r: %r (invalid 'type', expected string or array)"
                                  % (command, value))

            for index, choice in enumerate(preference_type):
                if not isinstance(choice, dict) \
                        or not isinstance(choice.get("value"), basestring) \
                        or not isinstance(choice.get("title"), basestring):
#.........这里部分代码省略.........
开发者ID:Aessy,项目名称:critic,代码行数:101,代码来源:inject.py


示例20: handle_input

 def handle_input(self, value):
     self.__requests = json_decode(value)
     self.__results = []
     self.server.add_requests(self, self.__requests)
开发者ID:KurSh,项目名称:critic,代码行数:4,代码来源:utils.py



注:本文中的textutils.json_decode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python textutils.json_encode函数代码示例发布时间:2022-05-27
下一篇:
Python texture.Texture类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap