• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python perl.decode_object_from_bytes_if_needed函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mediawords.util.perl.decode_object_from_bytes_if_needed函数的典型用法代码示例。如果您正苦于以下问题:Python decode_object_from_bytes_if_needed函数的具体用法?Python decode_object_from_bytes_if_needed怎么用?Python decode_object_from_bytes_if_needed使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了decode_object_from_bytes_if_needed函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: create

def create(db: DatabaseHandler, download: dict, extract: dict) -> dict:
    """Create a download_text hash and insert it into the database. Delete any existing download_text row for the
    download."""

    # FIXME don't pass freeform "extract" dict, we need just the "extracted_text"

    download = decode_object_from_bytes_if_needed(download)
    extract = decode_object_from_bytes_if_needed(extract)

    db.query("""
        DELETE FROM download_texts
        WHERE downloads_id = %(downloads_id)s
    """, {'downloads_id': download['downloads_id']})

    download_text = db.query("""
        INSERT INTO download_texts (downloads_id, download_text, download_text_length)
        VALUES (%(downloads_id)s, %(download_text)s, CHAR_LENGTH(%(download_text)s))
        RETURNING *
    """, {
        'downloads_id': download['downloads_id'],
        'download_text': extract['extracted_text'],
    }).hash()

    db.query("""
        UPDATE downloads
        SET extracted = 't'
        WHERE downloads_id = %(downloads_id)s
    """, {'downloads_id': download['downloads_id']})

    return download_text
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:30,代码来源:download_texts.py


示例2: run_job

    def run_job(cls, stories_id: int, topics_id: int) -> None:
        """Run the extract_story_links job, using mediawords.tm.extract_story_links for the logic."""
        if isinstance(stories_id, bytes):
            stories_id = decode_object_from_bytes_if_needed(stories_id)
        if stories_id is None:
            raise McExtractStoryLinksJobException("'stories_id' is None.")

        if isinstance(topics_id, bytes):
            topics_id = decode_object_from_bytes_if_needed(topics_id)
        if topics_id is None:
            raise McExtractStoryLinksJobException("'topics_id' is None.")

        stories_id = int(stories_id)
        topics_id = int(topics_id)

        log.info("Start fetching extracting links for stories_id %d topics_id %d" % (stories_id, topics_id))

        try:
            db = connect_to_db()
            story = db.require_by_id(table='stories', object_id=stories_id)
            topic = db.require_by_id(table='topics', object_id=topics_id)
            mediawords.tm.extract_story_links.extract_links_for_topic_story(db, story, topic)

        except Exception as ex:
            log.error("Error while processing story {}: {}".format(stories_id, ex))
            raise McExtractStoryLinksJobException(
                "Unable to process story {}: {}".format(stories_id, traceback.format_exc())
            )

        log.info("Finished fetching extracting links for stories_id %d topics_id %d" % (stories_id, topics_id))
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:30,代码来源:extract_story_links_job.py


示例3: link_canonical_url_from_html

def link_canonical_url_from_html(html: str, base_url: Optional[str] = None) -> Optional[str]:
    """From the provided HTML, determine the <link rel="canonical" /> URL (if any)."""
    html = str(decode_object_from_bytes_if_needed(html))

    base_url_decode = decode_object_from_bytes_if_needed(base_url)
    base_url = None if base_url_decode is None else str(base_url_decode)

    link_elements = re.findall(r'(<\s*?link.+?>)', html, re.I)
    for link_element in link_elements:
        if re.search(r'rel\s*?=\s*?["\']\s*?canonical\s*?["\']', link_element, re.I):
            match = re.search(r'href\s*?=\s*?["\'](.+?)["\']', link_element, re.I)
            if match:
                url = match.group(1)
                if not is_http_url(url):
                    # Maybe it's absolute path?
                    if base_url is not None:
                        return urljoin(base=base_url, url=url)
                    else:
                        log.debug(
                            "HTML <link rel='canonical'/> found, but the new URL '%s' doesn't seem to be valid." % url
                        )
                else:
                    # Looks like URL, so return it
                    return url
    return None
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:25,代码来源:parse_html.py


示例4: find_by_id

    def find_by_id(self, table: str, object_id: int) -> Union[Dict[str, Any], None]:
        """Do an ID lookup on the table and return a single row match if found."""

        # MC_REWRITE_TO_PYTHON: some IDs get passed as 'str' / 'bytes'; remove after getting rid of Catalyst
        # noinspection PyTypeChecker
        object_id = decode_object_from_bytes_if_needed(object_id)
        object_id = int(object_id)

        table = decode_object_from_bytes_if_needed(table)

        primary_key_column = self.primary_key_column(table)
        if not primary_key_column:
            raise McFindByIDException("Primary key for table '%s' was not found" % table)

        # Python substitution
        find_by_id_query = "SELECT * FROM %(table)s WHERE %(id_column)s" % {
            "table": table,
            "id_column": primary_key_column,
        }

        # psycopg2 substitution
        result = self.query(find_by_id_query + " = %(id_value)s", {'id_value': object_id})
        if result.rows() > 1:
            raise McFindByIDException("More than one row was found for ID '%d' from table '%s'" % (object_id, table))
        elif result.rows() == 1:
            return result.hash()
        else:
            return None
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:28,代码来源:handler.py


示例5: store_content

def store_content(db: DatabaseHandler, download: dict, content: str) -> dict:
    """Store the content for the download."""
    # feed_error state indicates that the download was successful but that there was a problem
    # parsing the feed afterward.  so we want to keep the feed_error state even if we redownload
    # the content

    download = decode_object_from_bytes_if_needed(download)
    content = decode_object_from_bytes_if_needed(content)

    new_state = 'success' if download['state'] != 'feed_error' else 'feed_error'

    try:
        path = _get_store_for_writing().store_content(db, download['downloads_id'], content)
    except Exception as ex:
        raise McDBIDownloadsException("error while trying to store download %d: %s" % (download['downloads_id'], ex))

    if new_state == 'success':
        download['error_message'] = ''

    db.update_by_id(
        table='downloads',
        object_id=download['downloads_id'],
        update_hash={'state': new_state, 'path': path, 'error_message': download['error_message']},
    )

    download = db.find_by_id('downloads', download['downloads_id'])

    return download
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:28,代码来源:downloads.py


示例6: _set_extractor_results_cache

def _set_extractor_results_cache(db, download: dict, results: dict) -> None:
    """Store results in extractor cache and manage size of cache."""

    # This cache is used as a backhanded way of extracting stories asynchronously in the topic spider.  Instead of
    # submitting extractor jobs and then directly checking whether a given story has been extracted, we just
    # throw extraction jobs in chunks into the extractor job and cache the results.  Then if we re-extract
    # the same story shortly after, this cache will hit and the cost will be trivial.

    download = decode_object_from_bytes_if_needed(download)
    results = decode_object_from_bytes_if_needed(results)

    # Upsert cache entry
    db.query("""
        INSERT INTO cache.extractor_results_cache (
            extracted_html,
            extracted_text,
            downloads_id
        ) VALUES (
            %(extracted_html)s,
            %(extracted_text)s,
            %(downloads_id)s
        ) ON CONFLICT (downloads_id) DO UPDATE SET
            extracted_html = EXCLUDED.extracted_html,
            extracted_text = EXCLUDED.extracted_text
    """, {
        'extracted_html': results['extracted_html'],
        'extracted_text': results['extracted_text'],
        'downloads_id': int(download['downloads_id']),
    })
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:29,代码来源:downloads.py


示例7: select

    def select(self, table: str, what_to_select: str, condition_hash: dict = None) -> DatabaseResult:
        """SELECT chosen columns from the table that match given conditions."""

        table = decode_object_from_bytes_if_needed(table)
        what_to_select = decode_object_from_bytes_if_needed(what_to_select)
        condition_hash = decode_object_from_bytes_if_needed(condition_hash)

        if condition_hash is None:
            condition_hash = {}

        condition_hash = condition_hash.copy()  # To be able to safely modify it

        # MC_REWRITE_TO_PYTHON: remove after getting rid of Catalyst
        if "submit" in condition_hash:
            del condition_hash["submit"]

        sql_conditions = []

        for key, value in condition_hash.items():
            condition = key
            condition += " = %(" + key + ")s"  # "%(key)s" to be resolved by psycopg2, not Python
            sql_conditions.append(condition)

            # Cast Inline::Python's booleans to Python's booleans
            # MC_REWRITE_TO_PYTHON: remove after porting
            if type(value).__name__ == '_perl_obj':
                value = bool(value)
                condition_hash[key] = value

        sql = "SELECT %s " % what_to_select
        sql += "FROM %s " % table
        if len(sql_conditions) > 0:
            sql += "WHERE %s" % " AND ".join(sql_conditions)

        return self.query(sql, condition_hash)
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:35,代码来源:handler.py


示例8: __init__

    def __init__(self,
                 no_dedup_sentences: bool = False,
                 no_delete: bool = False,
                 no_tag_extractor_version: bool = False,
                 use_cache: bool = False,
                 use_existing: bool = False):
        """Constructor."""

        if isinstance(no_dedup_sentences, bytes):
            no_dedup_sentences = decode_object_from_bytes_if_needed(no_dedup_sentences)
        if isinstance(no_delete, bytes):
            no_delete = decode_object_from_bytes_if_needed(no_delete)
        if isinstance(no_tag_extractor_version, bytes):
            no_tag_extractor_version = decode_object_from_bytes_if_needed(no_tag_extractor_version)
        if isinstance(use_cache, bytes):
            use_cache = decode_object_from_bytes_if_needed(use_cache)
        if isinstance(use_existing, bytes):
            use_existing = decode_object_from_bytes_if_needed(use_existing)

        # MC_REWRITE_TO_PYTHON: remove weird casts after Python rewrite
        no_dedup_sentences = bool(int(no_dedup_sentences))
        no_delete = bool(int(no_delete))
        no_tag_extractor_version = bool(int(no_tag_extractor_version))
        use_cache = bool(int(use_cache))
        use_existing = bool(int(use_existing))

        self.__no_dedup_sentences = no_dedup_sentences
        self.__no_delete = no_delete
        self.__no_tag_extractor_version = no_tag_extractor_version
        self.__use_cache = use_cache
        self.__use_existing = use_existing
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:31,代码来源:extractor_arguments.py


示例9: send_password_reset_token

def send_password_reset_token(db: DatabaseHandler, email: str, password_reset_link: str) -> None:
    """Prepare for password reset by emailing the password reset token."""

    email = decode_object_from_bytes_if_needed(email)
    password_reset_link = decode_object_from_bytes_if_needed(password_reset_link)

    # Check if user exists
    try:
        user = user_info(db=db, email=email)
        full_name = user.full_name()

    except Exception as ex:
        log.warning("Unable to fetch user profile for user '%s': %s" % (email, str(ex),))
        full_name = 'Nonexistent user'

    # If user was not found, send an email to a random address anyway to avoid timing attack
    full_password_reset_link = _generate_password_reset_token(
        db=db,
        email=email,
        password_reset_link=password_reset_link,
    )
    if not full_password_reset_link:
        log.warning("Unable to generate full password reset link for email '%s'" % email)
        email = '[email protected]'
        full_password_reset_link = 'password reset link'

    message = AuthResetPasswordMessage(to=email, full_name=full_name, password_reset_url=full_password_reset_link)
    if not send_email(message):
        raise McAuthResetPasswordException('Unable to send password reset email.')
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:29,代码来源:reset_password.py


示例10: add_story

def add_story(db: DatabaseHandler, story: dict, feeds_id: int, skip_checking_if_new: bool = False) -> Optional[dict]:
    """If the story is new, add story to the database with the feed of the download as story feed.

    Returns created story or None if story wasn't created.
    """

    story = decode_object_from_bytes_if_needed(story)
    if isinstance(feeds_id, bytes):
        feeds_id = decode_object_from_bytes_if_needed(feeds_id)
    feeds_id = int(feeds_id)
    if isinstance(skip_checking_if_new, bytes):
        skip_checking_if_new = decode_object_from_bytes_if_needed(skip_checking_if_new)
    skip_checking_if_new = bool(int(skip_checking_if_new))

    if db.in_transaction():
        raise McAddStoryException("add_story() can't be run from within transaction.")

    db.begin()

    db.query("LOCK TABLE stories IN ROW EXCLUSIVE MODE")

    if not skip_checking_if_new:
        if not is_new(db=db, story=story):
            log.debug("Story '{}' is not new.".format(story['url']))
            db.commit()
            return None

    medium = db.find_by_id(table='media', object_id=story['media_id'])

    if story.get('full_text_rss', None) is None:
        story['full_text_rss'] = medium.get('full_text_rss', False) or False
        if len(story.get('description', '')) == 0:
            story['full_text_rss'] = False

    try:
        story = db.create(table='stories', insert_hash=story)
    except Exception as ex:
        db.rollback()

        # FIXME get rid of this, replace with native upsert on "stories_guid" unique constraint
        if 'unique constraint \"stories_guid' in str(ex):
            log.warning(
                "Failed to add story for '{}' to GUID conflict (guid = '{}')".format(story['url'], story['guid'])
            )
            return None

        else:
            raise McAddStoryException("Error adding story: {}\nStory: {}".format(str(ex), str(story)))

    db.find_or_create(
        table='feeds_stories_map',
        insert_hash={
            'stories_id': story['stories_id'],
            'feeds_id': feeds_id,
        }
    )

    db.commit()

    return story
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:60,代码来源:stories.py


示例11: validate_new_password

def validate_new_password(email: str, password: str, password_repeat: str) -> str:
    """Check if password complies with strength the requirements.

    Returns empty string on valid password, error message on invalid password."""

    email = decode_object_from_bytes_if_needed(email)
    password = decode_object_from_bytes_if_needed(password)
    password_repeat = decode_object_from_bytes_if_needed(password_repeat)

    if not email:
        return 'Email address is empty.'

    if not (password and password_repeat):
        return 'To set the password, please repeat the new password twice.'

    if password != password_repeat:
        return 'Passwords do not match.'

    if len(password) < __MIN_PASSWORD_LENGTH or len(password) > __MAX_PASSWORD_LENGTH:
        return 'Password must be between %d and %d characters length.' % (__MIN_PASSWORD_LENGTH, __MAX_PASSWORD_LENGTH,)

    if password == email:
        return "New password is your email address; don't cheat!"

    return ''
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:25,代码来源:password.py


示例12: store_test_data_to_individual_files

def store_test_data_to_individual_files(basename: str, data: dict) -> None:
    """Write the given data to disk under the given basename; split the data (list) into individual files."""
    basename = decode_object_from_bytes_if_needed(basename)
    data = decode_object_from_bytes_if_needed(data)

    data_dict = {}
    for story in data:
        stories_id = story.get('stories_id', None)
        if not stories_id:
            raise McStoreTestDataToIndividualFilesException("Story ID is unset for story: {}".format(story))

        if stories_id in data_dict:
            raise McStoreTestDataToIndividualFilesException(
                "Story ID is not unique (such story already exists in a dict) for story: {}".format(story)
            )

        data_dict[stories_id] = story

    # Remove all files before overwriting them (in case the new unit test contains *less* stories, we don't want old
    # files lying around)
    old_data_files = __test_data_files(basename=basename)
    log.info("Will remove old data files at path '{}': {}".format(basename, old_data_files))
    for path in old_data_files:
        os.unlink(path)

    # Write dict to files
    for index in data_dict.keys():
        store_test_data(basename=str(index), data=data_dict[index], subdirectory=basename)
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:28,代码来源:data.py


示例13: password_reset_token_is_valid

def password_reset_token_is_valid(db: DatabaseHandler, email: str, password_reset_token: str) -> bool:
    """Validate password reset token (used for both user activation and password reset)."""
    email = decode_object_from_bytes_if_needed(email)
    password_reset_token = decode_object_from_bytes_if_needed(password_reset_token)

    if not (email and password_reset_token):
        log.error("Email and / or password reset token is empty.")
        return False

    # Fetch readonly information about the user
    password_reset_token_hash = db.query("""
        SELECT auth_users_id,
               email,
               password_reset_token_hash
        FROM auth_users
        WHERE email = %(email)s
        LIMIT 1
    """, {'email': email}).hash()
    if password_reset_token_hash is None or 'auth_users_id' not in password_reset_token_hash:
        log.error("Unable to find user %s in the database." % email)
        return False

    password_reset_token_hash = password_reset_token_hash['password_reset_token_hash']

    if password_hash_is_valid(password_hash=password_reset_token_hash, password=password_reset_token):
        return True
    else:
        return False
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:28,代码来源:password.py


示例14: __init__

    def __init__(self,
                 host: str,
                 port: int,
                 username: str,
                 password: str,
                 database: str,
                 do_not_check_schema_version: bool = False):
        """Database handler constructor; connects to PostgreSQL too."""

        host = decode_object_from_bytes_if_needed(host)
        # noinspection PyTypeChecker
        port = int(decode_object_from_bytes_if_needed(port))
        username = decode_object_from_bytes_if_needed(username)
        password = decode_object_from_bytes_if_needed(password)
        database = decode_object_from_bytes_if_needed(database)

        self.__primary_key_columns = {}
        self.__schema_version_check_pids = {}
        self.__print_warnings = True
        self.__in_manual_transaction = False
        self.__conn = None
        self.__db = None

        self.__connect(
            host=host,
            port=port,
            username=username,
            password=password,
            database=database,
            do_not_check_schema_version=do_not_check_schema_version
        )
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:31,代码来源:handler.py


示例15: find_or_create

    def find_or_create(self, table: str, insert_hash: dict) -> Dict[str, Any]:
        """Select a single row from the database matching the hash or insert a row with the hash values and return the
        inserted row as a hash."""

        # FIXME probably do this in a serialized transaction?

        table = decode_object_from_bytes_if_needed(table)
        insert_hash = decode_object_from_bytes_if_needed(insert_hash)

        insert_hash = insert_hash.copy()  # To be able to safely modify it

        if len(insert_hash) == 0:
            raise McFindOrCreateException("Hash to INSERT or SELECT is empty")

        # MC_REWRITE_TO_PYTHON: remove after getting rid of Catalyst
        if "submit" in insert_hash:
            del insert_hash["submit"]

        row = self.select(table=table, what_to_select='*', condition_hash=insert_hash)
        if row is not None and row.rows() > 0:
            return row.hash()
        else:
            # try to create it, but if some other process has created it because we don't have a lock, just use that one
            try:
                return self.create(table=table, insert_hash=insert_hash)
            except McUniqueConstraintException:
                return self.select(table=table, what_to_select='*', condition_hash=insert_hash).hash()
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:27,代码来源:handler.py


示例16: extract_tarball_to_directory

def extract_tarball_to_directory(archive_file: str, dest_directory: str, strip_root: bool = False) -> None:
    """Extract Tar archive (.tar, .tar.gz or .tgz) to destination directory, optionally stripping the root directory
    first."""

    archive_file = decode_object_from_bytes_if_needed(archive_file)
    dest_directory = decode_object_from_bytes_if_needed(dest_directory)

    if not os.path.isfile(archive_file):
        raise McExtractTarballToDirectoryException("Archive at '%s' does not exist" % archive_file)

    archive_file_extension = file_extension(archive_file)
    if archive_file_extension in [".gz", ".tgz"]:
        tar_args = "-zxf"
    elif archive_file_extension in [".tar"]:
        tar_args = "-xf"
    else:
        raise McExtractTarballToDirectoryException("Unsupported archive '%s' with extension '%s'" %
                                                   (archive_file, archive_file_extension))

    args = ["tar",
            tar_args, archive_file,
            "-C", dest_directory]
    if strip_root:
        args += ['--strip', '1']

    try:
        run_command_in_foreground(args)
    except McRunCommandInForegroundException as ex:
        raise McExtractTarballToDirectoryException("Error while extracting archive '%s': %s" % (archive_file, str(ex)))
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:29,代码来源:compress.py


示例17: get_session_lock

def get_session_lock(db: mediawords.db.DatabaseHandler, lock_type: str, lock_id: int, wait: bool = False) -> bool:
    """Get a postgres advisory lock with the lock_type and lock_id as the two keys.

    Arguments:
    db - db handle
    lock_type - must be in LOCK_TYPES dict above
    lock_id - id for the particular lock within the type
    wait - if true, block while waiting for the lock, else return false if the lock is not available

    Returns:
    True if the lock is available
    """
    lock_type = str(decode_object_from_bytes_if_needed(lock_type))

    if isinstance(lock_id, bytes):
        lock_id = decode_object_from_bytes_if_needed(lock_id)
    lock_id = int(lock_id)

    if isinstance(wait, bytes):
        wait = decode_object_from_bytes_if_needed(wait)
    wait = bool(wait)

    log.debug("trying for lock: %s, %d" % (lock_type, lock_id))

    if lock_type not in LOCK_TYPES:
        raise McDBLocksException("lock type not in LOCK_TYPES: %s" % lock_type)

    lock_type_id = LOCK_TYPES[lock_type]

    if wait:
        db.query("select pg_advisory_lock(%(a)s, %(b)s)", {'a': lock_type_id, 'b': lock_id})
        return True
    else:
        r = db.query("select pg_try_advisory_lock(%(a)s, %(b)s) as locked", {'a': lock_type_id, 'b': lock_id}).hash()
        return r['locked']
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:35,代码来源:locks.py


示例18: create_test_story

def create_test_story(db: DatabaseHandler, label: str, feed: dict) -> dict:
    """Create test story with a simple label belonging to feed."""

    label = decode_object_from_bytes_if_needed(label)
    feed = decode_object_from_bytes_if_needed(feed)

    story = db.create(
        table='stories',
        insert_hash={
            'media_id': int(feed['media_id']),
            'url': "http://story.test/%s" % label,
            'guid': "guid://story.test/%s" % label,
            'title': "story %s" % label,
            'description': "description %s" % label,
            'publish_date': '2016-10-15 08:00:00',
            'collect_date': '2016-10-15 10:00:00',
            'full_text_rss': True,
        }
    )

    db.create(
        table='feeds_stories_map',
        insert_hash={
            'feeds_id': int(feed['feeds_id']),
            'stories_id': int(story['stories_id']),
        }
    )

    return story
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:29,代码来源:create.py


示例19: _create_child_download_for_story

def _create_child_download_for_story(db: DatabaseHandler, story: dict, parent_download: dict) -> None:
    """Create a pending download for the story's URL."""
    story = decode_object_from_bytes_if_needed(story)
    parent_download = decode_object_from_bytes_if_needed(parent_download)

    download = {
        'feeds_id': parent_download['feeds_id'],
        'stories_id': story['stories_id'],
        'parent': parent_download['downloads_id'],
        'url': story['url'],
        'host': get_url_host(story['url']),
        'type': 'content',
        'sequence': 1,
        'state': 'pending',
        'priority': parent_download['priority'],
        'extracted': False,
    }

    content_delay = db.query("""
        SELECT content_delay
        FROM media
        WHERE media_id = %(media_id)s
    """, {'media_id': story['media_id']}).flat()[0]
    if content_delay:
        # Delay download of content this many hours. his is useful for sources that are likely to significantly change
        # content in the hours after it is first published.
        now = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
        download_at_timestamp = now + (content_delay * 60 * 60)
        download['download_time'] = get_sql_date_from_epoch(download_at_timestamp)

    db.create(table='downloads', insert_hash=download)
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:31,代码来源:stories.py


示例20: __init__

    def __init__(self,
                 email: str,
                 full_name: str = None,
                 notes: str = None,
                 active: bool = None,
                 weekly_requests_limit: int = None,
                 weekly_requested_items_limit: int = None,
                 password: str = None,
                 password_repeat: str = None,
                 role_ids: List[int] = None):
        super().__init__(
            email=email,
            full_name=full_name,
            notes=notes,
            active=active,
            weekly_requests_limit=weekly_requests_limit,
            weekly_requested_items_limit=weekly_requested_items_limit
        )

        password = decode_object_from_bytes_if_needed(password)
        password_repeat = decode_object_from_bytes_if_needed(password_repeat)

        if password is not None and password_repeat is not None:
            password_validation_message = validate_new_password(
                email=self.email(),
                password=password,
                password_repeat=password_repeat
            )
            if password_validation_message:
                raise McAuthUserException("Password is invalid: %s" % password_validation_message)

        self.__password = password
        self.__password_repeat = password_repeat
        self.__role_ids = role_ids
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:34,代码来源:user.py



注:本文中的mediawords.util.perl.decode_object_from_bytes_if_needed函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python core.Logger类代码示例发布时间:2022-05-27
下一篇:
Python db.DatabaseHandler类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap