• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python log.info函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中top.utils.log.log.info函数的典型用法代码示例。如果您正苦于以下问题:Python info函数的具体用法?Python info怎么用?Python info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _start

    def _start(self, event):
        """Override the :method:`top.utils.Daemon._start` method.

        Will perform a single iteration in dry and batch modes.

        **Args:**
            *event* (:mod:`threading.Event`): Internal semaphore that
            can be set via the :mod:`signal.signal.SIGTERM` signal event
            to perform a function within the running proess.

        """
        signal.signal(signal.SIGTERM, self._exit_handler)

        if self.reminder is None:
            self._reminder = top.Reminder(**(self.reminder_kwargs))

        while not event.isSet():
            if self.reminder.db():
                self.reminder.process(dry=self.dry)
            else:
                log.error('ODBC connection failure -- aborting')
                event.set()
                continue

            if not event.isSet():
                if self.dry:
                    log.info('Dry run iteration complete -- aborting')
                    event.set()
                elif self.batch:
                    log.info('Batch run iteration complete -- aborting')
                    event.set()
                else:
                    time.sleep(self.loop)
开发者ID:loum,项目名称:top,代码行数:33,代码来源:reminderdaemon.py


示例2: connect_resource

    def connect_resource(self, xfer):
        """ Connect to the FTP resource.

        """
        host = self.config.get(xfer, 'host')
        port = self.config.get(xfer, 'port')
        user = self.config.get(xfer, 'user')
        password = self.config.get(xfer, 'password')

        try:
            proxy = self.config.get(xfer, 'proxy')
        except ConfigParser.NoOptionError:
            proxy = None

        status = True
        try:
            if proxy is None:
                log.info('Connecting to "%s:%s"' % (host, port))
                self.connect(host=host, port=port)
            else:
                log.info('Connecting to proxy "%s"' % proxy)
                self.connect(host=proxy)
        except socket.error, err:
            log.error('Connection failed: %s' % err)
            status = False
开发者ID:loum,项目名称:top,代码行数:25,代码来源:ftp.py


示例3: get_xfer_files

    def get_xfer_files(self, source, filter=None, is_pod=False):
        """For outbound file transfers, get a list of files to transfer.

        **Args:**
            *source*: directory path where outbound files can be found

        **Kwargs:**
            *filter*: regular expression string to use to filter filenames

            *is_pod*: POD file require extra processing to identify
            associated signature files.

        """
        files_to_xfer = []

        for report in self.get_report_file(source, filter):
            if is_pod:
                for key in self.get_report_file_ids(report):
                    for ext in ['ps', 'png']:
                        pod = os.path.join(source, '%s.%s' % (key, ext))
                        files_to_xfer.append(pod)

            files_to_xfer.append(report)

        if not len(files_to_xfer):
            log.info('No files set to be transferred')

        return files_to_xfer
开发者ID:loum,项目名称:top,代码行数:28,代码来源:ftp.py


示例4: inbound

    def inbound(self, xfer, dry=False):
        """Incoming file transfer.

        **Args:**
            *xfer*: a :mod:`ConfigParser` section that represents
            an FTP transfer instance

        """
        log.info('Preparing inbound xfer ...')

        if self.connect_resource(xfer):
            xfer_set = []
            try:
                source = self.config.get(xfer, 'source')
            except ConfigParser.NoOptionError:
                source = None
            if source is not None:
                log.info('Setting CWD on server to "%s"' % source)
                self.cwd(source)

            try:
                partial = (self.config.get(xfer,
                                           'partial').lower() == 'yes')
            except ConfigParser.NoOptionError, err:
                partial = False

            try:
                remove_on_xfer = (self.config.get(xfer,
                                                  'delete').lower() == 'yes')
            except ConfigParser.NoOptionError, err:
                remove_on_xfer = False
开发者ID:loum,项目名称:top,代码行数:31,代码来源:ftp.py


示例5: process

    def process(self, raw):
        """Accepts an unformatted T1250 record *raw* and processes the
        translation to Toll Outlet Portal T1250 format.

        **Args:**
            *raw*: the source record to translate

        **Returns:**
            string representation of a T1250 record (1248 character
            length)

        """
        translated_line = tuple()
        parsed_dict = self.parser.parse_line(raw)
        connote_nbr = parsed_dict.get('Conn Note')
        log.info('Processing connote: "%s" ...' % connote_nbr)

        if (parsed_dict.get('ADP Type') is not None and
            parsed_dict.get('ADP Type') == 'PE'):
            log.info('Connote "%s" has a PE flag' % connote_nbr)
            bu = parsed_dict.get('System Identifier')

            # Need to fudge the Identifier field constant, 'YMLML11'.
            identifier_str = parsed_dict.get('Identifier')
            if identifier_str is not None:
                parsed_dict['Identifier'] = ('%s%s' % ('YMLML11',
                                                       identifier_str[7:]))
            translated_line = (bu, self.translate(parsed_dict))
        else:
            log.error('Unexpected "ADP Type" value: "%s"' %
                      parsed_dict.get('ADP Type'))

        return translated_line
开发者ID:loum,项目名称:top,代码行数:33,代码来源:mapper.py


示例6: ignore_record

    def ignore_record(self, agent_code):
        """Manages Business Unit rules that determine if a raw T1250
        record should be ignored.

        Current ignore rules include:

        * *Agent Id* starts with a ``P`` (this indicates a ParcelPoint job)

        **Args:**
            *agent_code*: the raw agent code string as parsed from the
            T1250 file.  For example:: ``N013``.

        **Returns:**
            boolean ``True`` if record should be ignored

            boolean ``False`` otherwise

        """
        log.info('Checking ignore rules ...')
        ignore = False

        log.debug('Checking agent code "%s"' % agent_code)
        if agent_code is not None:
            if agent_code.startswith('P'):
                log.info('Agent code "%s" starts with "P" -- ignoring' %
                         agent_code)
                ignore = True

        return ignore
开发者ID:loum,项目名称:top,代码行数:29,代码来源:loader.py


示例7: outfile

    def outfile(self, dir, identifier, state='VIC'):
        """Creates the Exporter output file based on current timestamp
        and verifies creation at the staging directory *dir*.

        During output file access, prepends ``.tmp`` to the file.

        **Args:**
            dir: base directory name of the staging directory

            identifier: business unit specific file identifier

        **Returns:**
            open file handle to the exporter report file (or None if file
            access fails)

        """
        status = True
        fh = None

        create_dir(dir)

        if status:
            # Create the output file.
            time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
            file = ("%s_%s_%s%s_%s.txt.tmp" %
                    (state, 'VANA', 'RE', identifier, time))
            file_path = os.path.join(dir, file)
            try:
                log.info('Opening file "%s"' % file_path)
                fh = open(file_path, 'wb')
            except IOError, err:
                status = False
                log.error('Could not open out file "%s": %s')
开发者ID:loum,项目名称:top,代码行数:33,代码来源:exporter.py


示例8: validate

    def validate(self, email):
        """Validate the *email* address.

        Runs a simple regex validation across the *email* address is

        **Args:**
            email: the email address to validate

        **Returns:**
            boolean ``True`` if the email validates

            boolean ``False`` if the email does not validate

        """
        status = True

        err = 'Email "%s" validation:' % email
        ex = '^[a-zA-Z0-9._%\-+][email protected][a-zA-Z0-9._%-]+\.[a-zA-Z]{2,6}$'
        r = re.compile(ex)
        m = r.match(email)
        if m is None:
            status = False
            log.error('%s Failed' % err)
        else:
            log.info('%s OK' % err)

        return status
开发者ID:loum,项目名称:top,代码行数:27,代码来源:emailerbase.py


示例9: rename_signature_files

    def rename_signature_files(self, dir, old_token, new_token, dry=False):
        """Search *dir* for files which match the filter *old_token*
        and rename with *new_token*.  The original file name extension
        will be retained.

        **Args:**
            *dir*: directory to search

            *old_token*: the token to use as a filter for file matches that
            will be renamed

            *new_token*: the new filename

        **Returns:**
            for the files that were renamed, a list of the new filenames

        """
        signature_files = []

        files = get_directory_files_list(dir, filter='%s\..*' % old_token)
        log.info('Found old signature files: "%s"' % files)

        for f in files:
            (fn, ext) = os.path.splitext(f)
            target = os.path.join(os.path.dirname(f),
                                  '%s%s' % (new_token, ext))
            move_file(f, target, dry=dry)
            signature_files.append(target)

        return signature_files
开发者ID:loum,项目名称:top,代码行数:30,代码来源:podtranslator.py


示例10: set_out_dir

    def set_out_dir(self, business_unit):
        """Uses the *business_unit* name to construct the output directory
        to which the report and signature files will be placed for further
        processing.

        Staging directories are based on the Business Unit.  For example,
        the Business Unit "Priority" will create the directory
        ``priority/out`` off the base staging directory.

        Will check if the output directory structure exists before
        attempting to create it.

        **Args:**
            business_unit: name of the Business Unit that is associated
            with the collected items output files.

        """
        if business_unit is None:
            self._out_dir = None
        else:
            log.info('Checking output directory for "%s" ...' %
                     business_unit)
            try:
                self._out_dir = os.path.join(self.staging_dir,
                                             business_unit.lower(),
                                             'out')
                create_dir(self._out_dir)
            except AttributeError, err:
                log.error('Output directory error: "%s"' % err)
                self._out_dir = None
开发者ID:loum,项目名称:top,代码行数:30,代码来源:exporter.py


示例11: process

    def process(self, comms_file, dry=False):
        """Attempts to send comms via appropratie medium based on
        *comms_file* comms event file.

        Successful notifications will set the ``job_item.notify`` column
        if the corresponding ``job_item.id``.

        **Args:**
            *comms_file*: absolute path to the comms file to process.

        **Kwargs:**
            *dry*: only report, do not execute (default ``False``)

        **Returns:**
            boolean ``True`` if *comms_file* is processed successfully

            boolean ``False`` otherwise

        """
        log.info('Processing comms file: "%s" ...' % comms_file)
        action = None
        id = None
        template = None
        comms_file_err = comms_file + '.err'
        comms_status = True
        filename = os.path.basename(comms_file)

        try:
            (action, id, template) = self.parse_comms_filename(filename)
        except ValueError, err:
            log.error('%s processing error: %s' % (comms_file, err))
            move_file(comms_file, comms_file_err, err=True, dry=dry)
            comms_status = False
开发者ID:loum,项目名称:top,代码行数:33,代码来源:comms.py


示例12: process

    def process(self, id=None, dry=False):
        """Checks ``agent_stocktake`` table for agents that have not
        completed a stocktake in the current period.

        **Kwargs:**
            *dry*: do not execute, only report

        **Returns:**
            list of ``agent_stocktake`` IDs of agents that have not
            performed a stocktake in the current period.

        """
        log.info('Agent compliance query ...')

        kwargs = {'period': self.period,
                  'delivery_partners': self.delivery_partners}
        sql = self.db.jobitem.agent_id_of_aged_parcels(**kwargs)
        self.db(sql)
        self.set_columns(self.db.columns())
        agents = list(self.db.rows())

        cleansed_agents = []
        for i in agents:
            cleansed_agents.append(self._cleanse(self.columns, i))

        log.info('Agent compliance query complete')

        return cleansed_agents
开发者ID:loum,项目名称:top,代码行数:28,代码来源:compliance.py


示例13: write

    def write(self, data, fhs, delivery_partner, infile, dry=False):
        """Write out *data* to the associated *fhs* file handler.

        *fhs* is based on the return value from
        :meth:`top.FilterDaemon.get_outbound_file`

        **Args:**
            *data*: the line item to write out

            *fhs*: dictionary structure capturing open file handle objects

            *delivery_partner*: name of the Delivery Partner that will
            receive the filtered T1250 file

            *infile*: source T1250 EDI file that is being filtered

        """
        log.info('Writing out connote "%s" ...' % data[0:20].rstrip())

        outfile = self.get_outbound_file(delivery_partner, infile)
        fh = fhs.get(outfile)
        if fh is None:
            log.info('Preparing file handler for outfile %s' % outfile)
            if not dry:
                fhs[outfile] = open(outfile, 'w')
                fh = fhs[outfile]

        if not dry:
            fh.write('%s' % data)
开发者ID:loum,项目名称:top,代码行数:29,代码来源:filterdaemon.py


示例14: get_item_number_job_id

    def get_item_number_job_id(self, item_nbr):
        """Checks the "job" table for related "job_item" records with
        *item_nbr*.  If more than one job exists, will sort against the
        job_ts against the most recent record.

        **Args:**
            item_nbr: item_nbr value parsed directly from the T1250 file

        **Returns:**
            integer value relating to the job.id if match is found.

            ``None`` otherwise.

        """
        log.info('Check for job records with item_nbr "%s"' % item_nbr)
        job_id = None

        sql = self.db.job.item_nbr_based_job_sql(item_nbr=item_nbr)
        self.db(sql)
        received = []
        for row in self.db.rows():
            received.append(row[0])

        if received:
            # Results are sorted by job_ts so grab the first index.
            job_id = received[0]
            log.info('Connote check -- job record id %d found' % job_id)

        return job_id
开发者ID:loum,项目名称:top,代码行数:29,代码来源:loader.py


示例15: validate_file

    def validate_file(self, filename):
        """Parse the T1250-format filename string and attempt to extract
        the Business Unit and file timestamp.

        **Kwargs:**
            filename: the filename string to parse

        **Returns:**
            tuple stucture as (<business_unit>, <timestamp>)

        """
        log.debug('Validating filename: "%s"' % filename)
        m = re.search('T1250_(TOL.*)_(\d{14})\.txt', filename)
        bu = None
        dt_formatted = None
        if m is None:
            log.error('Could not parse BU/time from file "%s"' % filename)
        else:
            bu = m.group(1).lower()
            file_timestamp = m.group(2)
            parsed_time = time.strptime(file_timestamp, "%Y%m%d%H%M%S")
            log.debug('parsed_time: %s' % parsed_time)
            dt = datetime.datetime.fromtimestamp(time.mktime(parsed_time))
            dt_formatted = dt.isoformat(' ')
            log.info('Parsed BU/time "%s/%s" from file "%s"' %
                     (bu, dt_formatted, filename))

        return (bu, dt_formatted)
开发者ID:loum,项目名称:top,代码行数:28,代码来源:loaderdaemon.py


示例16: comms

    def comms(self,
              method,
              job_item_id,
              recipient,
              template='body',
              dry=False):
        """Prepare comms event files.

        **Args:**
            *job_item_id*: integer as per the ``job_item.id`` column

            *recipient*: comms email recipient address

            *method*: either ``email`` or ``sms``

            *dry*: only report, do not execute

        """
        log.info('Checking comms for id|recipient|method: %s|%s|%s' %
                 (job_item_id, recipient, method))

        if recipient is not None and recipient:
            self.flag_comms(method,
                            job_item_id,
                            template,
                            dry=dry)
开发者ID:loum,项目名称:top,代码行数:26,代码来源:loader.py


示例17: get_agent_id

    def get_agent_id(self, agent):
        """Helper method to verify if an Agent ID is defined.

        **Args:**
            *agent*: the raw agent string (for example, ``N001``)

        **Returns:**
            on success, integer value taken from ``agent.id`` representing
            the *agent* details.  ``None`` otherwise.

        """
        log.info('Checking if Agent Id "%s" exists in system ...' % agent)
        self.db(self.db._agent.check_agent_id(agent_code=agent))
        agent_id_row_id = self.db.row

        if agent_id_row_id is not None:
            agent_id_row_id = agent_id_row_id[0]
            log.info('Agent Id "%s" found: %s' % (agent, agent_id_row_id))
        else:
            self.set_alerts('Agent Id "%s" does not exist' % agent)

            # For testing only, create the record.
            # First insert will fail the test -- subsequent checks should
            # be OK.  This will ensure we get a mix during testing.
            if self.db.host is None:
                log.debug('TEST: Creating Agent Id "%s" record ...' % agent)
                agent_fields = {'code': agent}
                sql = self.db._agent.insert_sql(agent_fields)
                # Just consume the new row id.
                id = self.db.insert(sql)

        return agent_id_row_id
开发者ID:loum,项目名称:top,代码行数:32,代码来源:loader.py


示例18: check_mobile_missing_leading_zero

    def check_mobile_missing_leading_zero(self, mobile_number):
        """Checks for a special case where the *mobile_number* number could
        have the leading ``0`` missing.

        The general algorithm is that a 9-digit *mobile_number*  with
        leading ``4`` will have a ``0`` prepended to complete a valid
        mobile number construct.

        **Args:**
            mobile_number: the mobile number to validate

        **Returns:**
            The transposed mobile number if the leading ``0`` scenario is
            encounterd.  Otherwise, the original value of *mobile_number*.

        """
        tmp_mobile_number = mobile_number

        regex = re.compile("\d{9}$")
        m = regex.match(mobile_number)
        if m is not None:
            if mobile_number[0] == '4':
                tmp_mobile_number = '0%s' % mobile_number
                log.info('Prepended "0" to mobile "%s" to produce "%s"' %
                         (mobile_number, tmp_mobile_number))

        return tmp_mobile_number
开发者ID:loum,项目名称:top,代码行数:27,代码来源:restsmser.py


示例19: get_files

    def get_files(self, dir=None, formats=None):
        """Identifies files that are to be processed.

        **Args:**
            *dir*: override :attr:`in_dirs` for directories to search

            *formats*: list of regular expressions to apply across
            directory files

        **Returns:**
            list of files found

        """
        if formats is None:
            formats = []

        files_to_process = []

        dirs_to_check = self.in_dirs
        if dir is not None:
            dirs_to_check = dir

        for dir_to_check in dirs_to_check:
            log.debug('Looking for files at: %s ...' % dir_to_check)
            for file in get_directory_files(dir_to_check):
                for format in formats:
                    if (check_filename(file, format)):
                        log.info('Found file: %s' % file)
                        files_to_process.append(file)

        files_to_process.sort()
        log.debug('Files set to be processed: "%s"' % str(files_to_process))

        return files_to_process
开发者ID:loum,项目名称:top,代码行数:34,代码来源:daemonservice.py


示例20: archive_file

    def archive_file(self, file, dry=False):
        """Will attempt to move *file* to the ADP archive directory.

        .. note::

            All attempts will be made to archive the original ``*.xlsx``
            and ``*.csv`` variant.

        **Args:**
            *file*: string representation of the filename to archive

        """
        filename = os.path.splitext(file)[0]
        file_base = os.path.basename(filename)
        log.info('Archiving "%s.*"' % filename)

        if self.archive_dir is not None:
            archive_path = os.path.join(self.archive_dir, 'adp')
            if not dry:
                for ext in ['xlsx', 'csv']:
                    move_file("%s.%s" % (filename, ext),
                              os.path.join(archive_path, "%s.%s" %
                                                         (file_base, ext)))
        else:
            log.info('Archive directory not defined -- archiving disabled')
开发者ID:loum,项目名称:top,代码行数:25,代码来源:adpdaemon.py



注:本文中的top.utils.log.log.info函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python celldict.GlobalsDict类代码示例发布时间:2022-05-27
下一篇:
Python log.error函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap