• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python stringutils.random_string函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中synapse.util.stringutils.random_string函数的典型用法代码示例。如果您正苦于以下问题:Python random_string函数的具体用法?Python random_string怎么用?Python random_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了random_string函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_exchange_refresh_token_none

    def test_exchange_refresh_token_none(self):
        uid = stringutils.random_string(32)
        generator = TokenGenerator()
        last_token = generator.generate(uid)

        with self.assertRaises(StoreError):
            yield self.store.exchange_refresh_token(last_token, generator.generate)
开发者ID:Vutsuak16,项目名称:synapse,代码行数:7,代码来源:test_registration.py


示例2: start_purge_history

    def start_purge_history(self, room_id, token,
                            delete_local_events=False):
        """Start off a history purge on a room.

        Args:
            room_id (str): The room to purge from

            token (str): topological token to delete events before
            delete_local_events (bool): True to delete local events as well as
                remote ones

        Returns:
            str: unique ID for this purge transaction.
        """
        if room_id in self._purges_in_progress_by_room:
            raise SynapseError(
                400,
                "History purge already in progress for %s" % (room_id, ),
            )

        purge_id = random_string(16)

        # we log the purge_id here so that it can be tied back to the
        # request id in the log lines.
        logger.info("[purge] starting purge_id %s", purge_id)

        self._purges_by_id[purge_id] = PurgeStatus()
        run_in_background(
            self._purge_history,
            purge_id, room_id, token, delete_local_events,
        )
        return purge_id
开发者ID:DoubleMalt,项目名称:synapse,代码行数:32,代码来源:pagination.py


示例3: __init__

    def __init__(self, hs):
        self.is_mine_id = hs.is_mine_id
        self.http_client = hs.get_simple_http_client()
        self.store = hs.get_datastore()
        self.user_to_num_current_syncs = {}
        self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
        self.clock = hs.get_clock()
        self.notifier = hs.get_notifier()

        active_presence = self.store.take_presence_startup_info()
        self.user_to_current_state = {
            state.user_id: state
            for state in active_presence
        }

        self.process_id = random_string(16)
        logger.info("Presence process_id is %r", self.process_id)

        self._sending_sync = False
        self._need_to_send_sync = False
        self.clock.looping_call(
            self._send_syncing_users_regularly,
            UPDATE_SYNCING_USERS_MS,
        )

        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
开发者ID:mebjas,项目名称:synapse,代码行数:26,代码来源:synchrotron.py


示例4: __init__

    def __init__(self, clock):
        self.clock = clock

        self.last_received_command = self.clock.time_msec()
        self.last_sent_command = 0
        self.time_we_closed = None  # When we requested the connection be closed

        self.received_ping = False  # Have we reecived a ping from the other side

        self.state = ConnectionStates.CONNECTING

        self.name = "anon"  # The name sent by a client.
        self.conn_id = random_string(5)  # To dedupe in case of name clashes.

        # List of pending commands to send once we've established the connection
        self.pending_commands = []

        # The LoopingCall for sending pings.
        self._send_ping_loop = None

        self.inbound_commands_counter = CounterMetric(
            "inbound_commands", labels=["command"],
        )
        self.outbound_commands_counter = CounterMetric(
            "outbound_commands", labels=["command"],
        )
开发者ID:rubo77,项目名称:synapse,代码行数:26,代码来源:protocol.py


示例5: create_content

    def create_content(self, media_type, upload_name, content, content_length,
                       auth_user):
        media_id = random_string(24)

        fname = self.filepaths.local_media_filepath(media_id)
        self._makedirs(fname)

        # This shouldn't block for very long because the content will have
        # already been uploaded at this point.
        with open(fname, "wb") as f:
            f.write(content)

        yield self.store.store_local_media(
            media_id=media_id,
            media_type=media_type,
            time_now_ms=self.clock.time_msec(),
            upload_name=upload_name,
            media_length=content_length,
            user_id=auth_user,
        )
        media_info = {
            "media_type": media_type,
            "media_length": content_length,
        }

        yield self._generate_local_thumbnails(media_id, media_info)

        defer.returnValue("mxc://%s/%s" % (self.server_name, media_id))
开发者ID:mebjas,项目名称:synapse,代码行数:28,代码来源:media_repository.py


示例6: __init__

    def __init__(self, hs):
        self.hs = hs
        self.is_mine_id = hs.is_mine_id
        self.http_client = hs.get_simple_http_client()
        self.store = hs.get_datastore()
        self.user_to_num_current_syncs = {}
        self.clock = hs.get_clock()
        self.notifier = hs.get_notifier()

        active_presence = self.store.take_presence_startup_info()
        self.user_to_current_state = {
            state.user_id: state
            for state in active_presence
        }

        # user_id -> last_sync_ms. Lists the users that have stopped syncing
        # but we haven't notified the master of that yet
        self.users_going_offline = {}

        self._send_stop_syncing_loop = self.clock.looping_call(
            self.send_stop_syncing, 10 * 1000
        )

        self.process_id = random_string(16)
        logger.info("Presence process_id is %r", self.process_id)
开发者ID:matrix-org,项目名称:synapse,代码行数:25,代码来源:synchrotron.py


示例7: create_event_id

    def create_event_id(self):
        i = str(self.event_id_count)
        self.event_id_count += 1

        local_part = str(int(self.clock.time())) + i + random_string(5)

        e_id = EventID.create(local_part, self.hostname)

        return e_id.to_string()
开发者ID:0-T-0,项目名称:synapse,代码行数:9,代码来源:builder.py


示例8: map_request_to_name

    def map_request_to_name(self, request):
        # auth the user
        auth_user = yield self.auth.get_user_by_req(request)

        # namespace all file uploads on the user
        prefix = base64.urlsafe_b64encode(
            auth_user.to_string()
        ).replace('=', '')

        # use a random string for the main portion
        main_part = random_string(24)

        # suffix with a file extension if we can make one. This is nice to
        # provide a hint to clients on the file information. We will also reuse
        # this info to spit back the content type to the client.
        suffix = ""
        if request.requestHeaders.hasHeader("Content-Type"):
            content_type = request.requestHeaders.getRawHeaders(
                "Content-Type")[0]
            suffix = "." + base64.urlsafe_b64encode(content_type)
            if (content_type.split("/")[0].lower() in
                    ["image", "video", "audio"]):
                file_ext = content_type.split("/")[-1]
                # be a little paranoid and only allow a-z
                file_ext = re.sub("[^a-z]", "", file_ext)
                suffix += "." + file_ext

        file_name = prefix + main_part + suffix
        file_path = os.path.join(self.directory, file_name)
        logger.info("User %s is uploading a file to path %s",
                    auth_user.to_string(),
                    file_path)

        # keep trying to make a non-clashing file, with a sensible max attempts
        attempts = 0
        while os.path.exists(file_path):
            main_part = random_string(24)
            file_name = prefix + main_part + suffix
            file_path = os.path.join(self.directory, file_name)
            attempts += 1
            if attempts > 25:  # really? Really?
                raise SynapseError(500, "Unable to create file.")

        defer.returnValue(file_path)
开发者ID:winsontan520,项目名称:synapse,代码行数:44,代码来源:content_repository.py


示例9: generate_files

 def generate_files(self, config):
     signing_key_path = config["signing_key_path"]
     if not os.path.exists(signing_key_path):
         with open(signing_key_path, "w") as signing_key_file:
             key_id = "a_" + random_string(4)
             write_signing_keys(
                 signing_key_file, (generate_signing_key(key_id),),
             )
     else:
         signing_keys = self.read_file(signing_key_path, "signing_key")
         if len(signing_keys.split("\n")[0].split()) == 1:
             # handle keys in the old format.
             key_id = "a_" + random_string(4)
             key = decode_signing_key_base64(
                 NACL_ED25519, key_id, signing_keys.split("\n")[0]
             )
             with open(signing_key_path, "w") as signing_key_file:
                 write_signing_keys(
                     signing_key_file, (key,),
                 )
开发者ID:MorganBauer,项目名称:synapse,代码行数:20,代码来源:key.py


示例10: test_exchange_refresh_token_invalid

    def test_exchange_refresh_token_invalid(self):
        uid = stringutils.random_string(32)
        generator = TokenGenerator()
        last_token = generator.generate(uid)
        wrong_token = "%s-wrong" % (last_token,)

        self.db_pool.runQuery(
            "INSERT INTO refresh_tokens(user_id, token) VALUES(?,?)",
            (uid, wrong_token,))

        with self.assertRaises(StoreError):
            yield self.store.exchange_refresh_token(last_token, generator.generate)
开发者ID:Vutsuak16,项目名称:synapse,代码行数:12,代码来源:test_registration.py


示例11: _get_session_info

    def _get_session_info(self, request, session_id):
        if not session_id:
            # create a new session
            while session_id is None or session_id in self.sessions:
                session_id = stringutils.random_string(24)
            self.sessions[session_id] = {
                "id": session_id,
                LoginType.EMAIL_IDENTITY: False,
                LoginType.RECAPTCHA: False
            }

        return self.sessions[session_id]
开发者ID:gitter-badger,项目名称:synapse,代码行数:12,代码来源:register.py


示例12: test_room_creation_too_long

    def test_room_creation_too_long(self):
        url = "/_matrix/client/r0/createRoom"

        # We use deliberately a localpart under the length threshold so
        # that we can make sure that the check is done on the whole alias.
        data = {"room_alias_name": random_string(256 - len(self.hs.hostname))}
        request_data = json.dumps(data)
        request, channel = self.make_request(
            "POST", url, request_data, access_token=self.user_tok
        )
        self.render(request)
        self.assertEqual(channel.code, 400, channel.result)
开发者ID:matrix-org,项目名称:synapse,代码行数:12,代码来源:test_directory.py


示例13: send_request

        def send_request(**kwargs):
            data = yield cls._serialize_payload(**kwargs)

            url_args = [
                urllib.parse.quote(kwargs[name], safe='')
                for name in cls.PATH_ARGS
            ]

            if cls.CACHE:
                txn_id = random_string(10)
                url_args.append(txn_id)

            if cls.METHOD == "POST":
                request_func = client.post_json_get_json
            elif cls.METHOD == "PUT":
                request_func = client.put_json
            elif cls.METHOD == "GET":
                request_func = client.get_json
            else:
                # We have already asserted in the constructor that a
                # compatible was picked, but lets be paranoid.
                raise Exception(
                    "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
                )

            uri = "http://%s:%s/_synapse/replication/%s/%s" % (
                host, port, cls.NAME, "/".join(url_args)
            )

            try:
                # We keep retrying the same request for timeouts. This is so that we
                # have a good idea that the request has either succeeded or failed on
                # the master, and so whether we should clean up or not.
                while True:
                    try:
                        result = yield request_func(uri, data)
                        break
                    except CodeMessageException as e:
                        if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
                            raise

                    logger.warn("%s request timed out", cls.NAME)

                    # If we timed out we probably don't need to worry about backing
                    # off too much, but lets just wait a little anyway.
                    yield clock.sleep(1)
            except HttpResponseException as e:
                # We convert to SynapseError as we know that it was a SynapseError
                # on the master process that we should send to the client. (And
                # importantly, not stack traces everywhere)
                raise e.to_synapse_error()

            defer.returnValue(result)
开发者ID:matrix-org,项目名称:synapse,代码行数:53,代码来源:_base.py


示例14: _get_session_info

    def _get_session_info(self, session_id):
        if session_id not in self.sessions:
            session_id = None

        if not session_id:
            # create a new session
            while session_id is None or session_id in self.sessions:
                session_id = stringutils.random_string(24)
            self.sessions[session_id] = {
                "id": session_id,
            }

        return self.sessions[session_id]
开发者ID:payingattention,项目名称:synapse,代码行数:13,代码来源:auth.py


示例15: test_room_creation

    def test_room_creation(self):
        url = "/_matrix/client/r0/createRoom"

        # Check with an alias of allowed length. There should already be
        # a test that ensures it works in test_register.py, but let's be
        # as cautious as possible here.
        data = {"room_alias_name": random_string(5)}
        request_data = json.dumps(data)
        request, channel = self.make_request(
            "POST", url, request_data, access_token=self.user_tok
        )
        self.render(request)
        self.assertEqual(channel.code, 200, channel.result)
开发者ID:matrix-org,项目名称:synapse,代码行数:13,代码来源:test_directory.py


示例16: test_exchange_refresh_token_valid

    def test_exchange_refresh_token_valid(self):
        uid = stringutils.random_string(32)
        device_id = stringutils.random_string(16)
        generator = TokenGenerator()
        last_token = generator.generate(uid)

        self.db_pool.runQuery(
            "INSERT INTO refresh_tokens(user_id, token, device_id) "
            "VALUES(?,?,?)",
            (uid, last_token, device_id))

        (found_user_id, refresh_token, device_id) = \
            yield self.store.exchange_refresh_token(last_token,
                                                    generator.generate)
        self.assertEqual(uid, found_user_id)

        rows = yield self.db_pool.runQuery(
            "SELECT token, device_id FROM refresh_tokens WHERE user_id = ?",
            (uid, ))
        self.assertEqual([(refresh_token, device_id)], rows)
        # We issued token 1, then exchanged it for token 2
        expected_refresh_token = u"%s-%d" % (uid, 2,)
        self.assertEqual(expected_refresh_token, refresh_token)
开发者ID:mebjas,项目名称:synapse,代码行数:23,代码来源:test_registration.py


示例17: _download_remote_file

    def _download_remote_file(self, server_name, media_id):
        file_id = random_string(24)

        fname = self.filepaths.remote_media_filepath(
            server_name, file_id
        )
        self._makedirs(fname)

        try:
            with open(fname, "wb") as f:
                request_path = "/".join((
                    "/_matrix/media/v1/download", server_name, media_id,
                ))
                length, headers = yield self.client.get_file(
                    server_name, request_path, output_stream=f,
                    max_size=self.max_upload_size,
                )
            media_type = headers["Content-Type"][0]
            time_now_ms = self.clock.time_msec()

            yield self.store.store_cached_remote_media(
                origin=server_name,
                media_id=media_id,
                media_type=media_type,
                time_now_ms=self.clock.time_msec(),
                upload_name=None,
                media_length=length,
                filesystem_id=file_id,
            )
        except:
            os.remove(fname)
            raise

        media_info = {
            "media_type": media_type,
            "media_length": length,
            "upload_name": None,
            "created_ts": time_now_ms,
            "filesystem_id": file_id,
        }

        yield self._generate_remote_thumbnails(
            server_name, media_id, media_info
        )

        defer.returnValue(media_info)
开发者ID:heavenlyhash,项目名称:synapse,代码行数:46,代码来源:base_resource.py


示例18: send_device_message

    def send_device_message(self, sender_user_id, message_type, messages):

        local_messages = {}
        remote_messages = {}
        for user_id, by_device in messages.items():
            # we use UserID.from_string to catch invalid user ids
            if self.is_mine(UserID.from_string(user_id)):
                messages_by_device = {
                    device_id: {
                        "content": message_content,
                        "type": message_type,
                        "sender": sender_user_id,
                    }
                    for device_id, message_content in by_device.items()
                }
                if messages_by_device:
                    local_messages[user_id] = messages_by_device
            else:
                destination = get_domain_from_id(user_id)
                remote_messages.setdefault(destination, {})[user_id] = by_device

        message_id = random_string(16)

        remote_edu_contents = {}
        for destination, messages in remote_messages.items():
            remote_edu_contents[destination] = {
                "messages": messages,
                "sender": sender_user_id,
                "type": message_type,
                "message_id": message_id,
            }

        stream_id = yield self.store.add_messages_to_device_inbox(
            local_messages, remote_edu_contents
        )

        self.notifier.on_new_event(
            "to_device_key", stream_id, users=local_messages.keys()
        )

        for destination in remote_messages.keys():
            # Enqueue a new federation transaction to send the new
            # device messages to each remote destination.
            self.federation.send_device_messages(destination)
开发者ID:DoubleMalt,项目名称:synapse,代码行数:44,代码来源:devicemessage.py


示例19: _get_remote_media_impl

    def _get_remote_media_impl(self, server_name, media_id):
        """Looks for media in local cache, if not there then attempt to
        download from remote server.

        Args:
            server_name (str): Remote server_name where the media originated.
            media_id (str): The media ID of the content (as defined by the
                remote server).

        Returns:
            Deferred[(Responder, media_info)]
        """
        media_info = yield self.store.get_cached_remote_media(
            server_name, media_id
        )

        # file_id is the ID we use to track the file locally. If we've already
        # seen the file then reuse the existing ID, otherwise genereate a new
        # one.
        if media_info:
            file_id = media_info["filesystem_id"]
        else:
            file_id = random_string(24)

        file_info = FileInfo(server_name, file_id)

        # If we have an entry in the DB, try and look for it
        if media_info:
            if media_info["quarantined_by"]:
                logger.info("Media is quarantined")
                raise NotFoundError()

            responder = yield self.media_storage.fetch_media(file_info)
            if responder:
                defer.returnValue((responder, media_info))

        # Failed to find the file anywhere, lets download it.

        media_info = yield self._download_remote_file(
            server_name, media_id, file_id
        )

        responder = yield self.media_storage.fetch_media(file_info)
        defer.returnValue((responder, media_info))
开发者ID:rubo77,项目名称:synapse,代码行数:44,代码来源:media_repository.py


示例20: check_device_registered

    def check_device_registered(self, user_id, device_id,
                                initial_device_display_name=None):
        """
        If the given device has not been registered, register it with the
        supplied display name.

        If no device_id is supplied, we make one up.

        Args:
            user_id (str):  @user:id
            device_id (str | None): device id supplied by client
            initial_device_display_name (str | None): device display name from
                 client
        Returns:
            str: device id (generated if none was supplied)
        """
        if device_id is not None:
            yield self.store.store_device(
                user_id=user_id,
                device_id=device_id,
                initial_device_display_name=initial_device_display_name,
                ignore_if_known=True,
            )
            defer.returnValue(device_id)

        # if the device id is not specified, we'll autogen one, but loop a few
        # times in case of a clash.
        attempts = 0
        while attempts < 5:
            try:
                device_id = stringutils.random_string(10).upper()
                yield self.store.store_device(
                    user_id=user_id,
                    device_id=device_id,
                    initial_device_display_name=initial_device_display_name,
                    ignore_if_known=False,
                )
                defer.returnValue(device_id)
            except errors.StoreError:
                attempts += 1

        raise errors.StoreError(500, "Couldn't generate a device ID.")
开发者ID:mebjas,项目名称:synapse,代码行数:42,代码来源:device.py



注:本文中的synapse.util.stringutils.random_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python synapseclient.File类代码示例发布时间:2022-05-27
下一篇:
Python logcontext.LoggingContext类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap