• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pyinotify.Notifier类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyinotify.Notifier的典型用法代码示例。如果您正苦于以下问题:Python Notifier类的具体用法?Python Notifier怎么用?Python Notifier使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Notifier类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: enabled

    def enabled(self):
        if not self.running:
            wm = WatchManager()
            self.event_handler = LibraryEvent(app.library)

            # Choose event types to watch for
            # FIXME: watch for IN_CREATE or for some reason folder copies
            # are missed,  --nickb
            FLAGS = ['IN_DELETE', 'IN_CLOSE_WRITE',# 'IN_MODIFY',
                     'IN_MOVED_FROM', 'IN_MOVED_TO', 'IN_CREATE']
            mask = reduce(lambda x, s: x | EventsCodes.ALL_FLAGS[s], FLAGS, 0)

            if self.USE_THREADS:
                print_d("Using threaded notifier")
                self.notifier = ThreadedNotifier(wm, self.event_handler)
                # Daemonize to ensure thread dies on exit
                self.notifier.daemon = True
                self.notifier.start()
            else:
                self.notifier = Notifier(wm, self.event_handler, timeout=100)
                GLib.timeout_add(1000, self.unthreaded_callback)

            for path in get_scan_dirs():
                print_d('Watching directory %s for %s' % (path, FLAGS))
                # See https://github.com/seb-m/pyinotify/wiki/
                # Frequently-Asked-Questions
                wm.add_watch(path, mask, rec=True, auto_add=True)

            self.running = True
开发者ID:MikeiLL,项目名称:quodlibet,代码行数:29,代码来源:auto_library_update.py


示例2: watch_path

def watch_path(path, add_watch_opt=None, watcher_opt=None):
    """Tail all the files specify by path.
    
    path:  By default all files under the path, which should be a directory, are tailed.
           Path could also be list of paths or a glob pattern.
           The behavior can be modified by add_watch_opt. 
           See pyinotify.WatchManager.add_watch.

    output: defaults to stdout. 
            can be diverted any callable with:
                   watcher_opt=dict(out=got_log_line)
            where got_log_line() takes a tuple (log_path, log_line)

    *_opt: Are pass-through to pyinotify.WatchManager.add_watch and tailall.Monitor.
           See respective functions for detail.
    """

    wm = WatchManager()
    notifier = Notifier(wm, default_proc_fun=FsEvent())
    #mask=ALL_EVENTS
    #mask=IN_MOVED_TO|IN_CREATE|IN_MODIFY
    mask=IN_MODIFY|IN_CLOSE_WRITE
    kw=dict(rec=True, auto_add=False)
    kw.update(add_watch_opt or {})
    wm.add_watch(path, mask, **kw)

    monitor=Monitor(watcher_opt=watcher_opt)

    notifier.loop(callback=monitor.got_event)
开发者ID:tengu,项目名称:tailall,代码行数:29,代码来源:tailall.py


示例3: fsMonitor

def fsMonitor(path="/data"):
    wm = WatchManager()
    mask = IN_DELETE | IN_MODIFY | IN_CREATE
    notifier = Notifier(wm, EventHandler(), read_freq=10)
    notifier.coalesce_events()
    wm.add_watch(path, mask, rec=True, auto_add=True)
    notifier.loop()
开发者ID:soone,项目名称:docker-rsync,代码行数:7,代码来源:pyrsync.py


示例4: enabled

    def enabled(self):
        if not self.running:
            wm = WatchManager()
            self.event_handler = LibraryEvent(library=app.library)

            FLAGS = ['IN_DELETE', 'IN_CLOSE_WRITE', # 'IN_MODIFY',
                     'IN_MOVED_FROM', 'IN_MOVED_TO', 'IN_CREATE']

            masks = [EventsCodes.FLAG_COLLECTIONS['OP_FLAGS'][s]
                     for s in FLAGS]
            mask = reduce(operator.or_, masks, 0)

            if self.USE_THREADS:
                print_d("Using threaded notifier")
                self.notifier = ThreadedNotifier(wm, self.event_handler)
                # Daemonize to ensure thread dies on exit
                self.notifier.daemon = True
                self.notifier.start()
            else:
                self.notifier = Notifier(wm, self.event_handler, timeout=100)
                GLib.timeout_add(1000, self.unthreaded_callback)

            for path in get_scan_dirs():
                real_path = os.path.realpath(path)
                print_d('Watching directory %s for %s (mask: %x)'
                        % (real_path, FLAGS, mask))
                # See https://github.com/seb-m/pyinotify/wiki/
                # Frequently-Asked-Questions
                wm.add_watch(real_path, mask, rec=True, auto_add=True)

            self.running = True
开发者ID:Muges,项目名称:quodlibet,代码行数:31,代码来源:auto_library_update.py


示例5: run

    def run(self):
        """Create the inotify WatchManager and generate FileSysEvents"""

        if utils.is_win32():
            self.run_win32()
            return

        # Only capture events that git cares about
        self._wmgr = WatchManager()
        if self._is_pyinotify_08x():
            notifier = Notifier(self._wmgr, FileSysEvent(), timeout=self._timeout)
        else:
            notifier = Notifier(self._wmgr, FileSysEvent())

        self._watch_directory(self._path)

        # Register files/directories known to git
        for filename in core.decode(self._git.ls_files()).splitlines():
            filename = os.path.realpath(filename)
            directory = os.path.dirname(filename)
            self._watch_directory(directory)

        # self._running signals app termination.  The timeout is a tradeoff
        # between fast notification response and waiting too long to exit.
        while self._running:
            if self._is_pyinotify_08x():
                check = notifier.check_events()
            else:
                check = notifier.check_events(timeout=self._timeout)
            if not self._running:
                break
            if check:
                notifier.read_events()
                notifier.process_events()
        notifier.stop()
开发者ID:rakoo,项目名称:git-cola,代码行数:35,代码来源:inotify.py


示例6: run

 def run(self):
     p = PTmp()
     notifier = Notifier(Settings.wm, p)
     # TODO: Not necessary to watch the directories that already have
     # a procedures.xml
     Settings.wdd = Settings.wm.add_watch(Settings.monitor_path, Settings.mask, rec=True)
     notifier.loop()
开发者ID:masums,项目名称:uia2atk,代码行数:7,代码来源:qamon.py


示例7: watch

def watch(pathes, extensions):
    manager = WatchManager()
    handler = Handler(extensions=extensions)
    notifier = Notifier(manager, default_proc_fun=handler)
    for path in pathes:
        manager.add_watch(path, IN_MODIFY, rec=True, auto_add=True)
    notifier.loop()
开发者ID:zweifisch,项目名称:wsgi-supervisor,代码行数:7,代码来源:supervisor.py


示例8: main

def main():
    vm = WatchManager()
    vm.add_watch(monitor_dirs,ALL_EVENTS,rec = True)

    en = MyEvent()
    notifier = Notifier(vm,en)
    notifier.loop()
开发者ID:weekday,项目名称:jiankong,代码行数:7,代码来源:pyinotify_file.py


示例9: run

 def run(self):
     """Create the inotify WatchManager and generate FileSysEvents"""
     # Only capture events that git cares about
     self._wmgr = WatchManager()
     if self._is_pyinotify_08x():
         notifier = Notifier(self._wmgr, FileSysEvent(self),
                             timeout=self._timeout)
     else:
         notifier = Notifier(self._wmgr, FileSysEvent(self))
     dirs_seen = set()
     added_flag = False
     # self._abort signals app termination.  The timeout is a tradeoff
     # between fast notification response and waiting too long to exit.
     while not self._abort:
         if not added_flag:
             self._watch_directory(self._path)
             # Register files/directories known to git
             for filename in self._git.ls_files().splitlines():
                 directory = os.path.dirname(filename)
                 self._watch_directory(directory)
             added_flag = True
         notifier.process_events()
         if self._is_pyinotify_08x():
             check = notifier.check_events()
         else:
             check = notifier.check_events(timeout=self._timeout)
         if check:
             notifier.read_events()
     notifier.stop()
开发者ID:johnpaulett,项目名称:git-cola,代码行数:29,代码来源:inotify.py


示例10: test_update_conf

    def test_update_conf(self, default_config, show_notification):
        conf_time_1 = path.getmtime(self.tmp.conf.join("config.py"))
        out_file = self.tmp_output.join("out.log")
        command_args = [
            "-c", self.config_file,
            "-r", "bash -c 'echo a | tee -a {}'".format(out_file),
            "-d", unicode(self.tmp.src),
        ]
        events = [
            (self.tmp.conf, "config.py", "# some new data"),
            (self.tmp.conf, "config.py", "# some new data"),
        ]

        self._copy_default_config(default_config)
        default_config.RUNNER_DELAY = -1

        wm = WatchManager()
        config = Config(watch_manager=wm, command_args=command_args)
        handler = FileChangeHandler(config=config)
        notifier = Notifier(wm, handler, timeout=1000)

        notifier.loop(callback=partial(self._event_generator, events))

        # There are some stupid race conditions (possibly due to the callbacks)
        # Sleep time allows to execute all needed code
        sleep(0.2)

        conf_time_2 = path.getmtime(self.tmp.conf.join("config.py"))

        self.assertNotEqual(conf_time_1, conf_time_2)
        self.assertTrue(path.exists(out_file))
        self.assertEqual(show_notification.call_count, 2)
开发者ID:mrfuxi,项目名称:testrunner,代码行数:32,代码来源:test_functional.py


示例11: __init__

    def __init__(
        self,
        watch_manager,
        default_proc_fun=None,
        read_freq=0,
        threshold=0,
        timeout=None,
        airtime_config=None,
        api_client=None,
        bootstrap=None,
        mmc=None,
    ):
        Notifier.__init__(self, watch_manager, default_proc_fun, read_freq, threshold, timeout)

        self.logger = logging.getLogger()
        self.config = airtime_config
        self.api_client = api_client
        self.bootstrap = bootstrap
        self.md_manager = AirtimeMetadata()
        self.import_processes = {}
        self.watched_folders = []
        self.mmc = mmc
        self.wm = watch_manager
        self.mask = pyinotify.ALL_EVENTS

        while not self.init_rabbit_mq():
            self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
            time.sleep(5)
开发者ID:RadioCampusFrance,项目名称:airtime,代码行数:28,代码来源:airtimenotifier.py


示例12: test_update_filtered

    def test_update_filtered(self, default_config, show_notification):
        out_file = self.tmp_output.join("out.log")
        command_args = [
            "-c", self.config_file,
            "-r", "bash -c 'echo a | tee -a {}'".format(out_file),
            "-d", unicode(self.tmp.src),
        ]
        events = [
            (self.tmp.src, "filtered_1.pyc", "some new data"),
            (self.tmp.src, "filtered_2.tmp", "some new data"),
            (self.tmp.src, ".hidden", "some new data"),
        ]

        self._copy_default_config(default_config)
        default_config.RUNNER_DELAY = -1

        wm = WatchManager()
        config = Config(watch_manager=wm, command_args=command_args)
        handler = FileChangeHandler(config=config)
        notifier = Notifier(wm, handler)

        notifier.loop(callback=partial(self._event_generator, events))

        # There are some stupid race conditions (possibly due to the callbacks)
        # Sleep time allows to execute all needed code
        sleep(0.2)

        self.assertTrue(path.exists(self.tmp.src.join("filtered_1.pyc")))
        self.assertTrue(path.exists(self.tmp.src.join("filtered_2.tmp")))
        self.assertTrue(path.exists(self.tmp.src.join(".hidden")))
        self.assertFalse(path.exists(out_file))
        self.assertFalse(show_notification.called)
开发者ID:mrfuxi,项目名称:testrunner,代码行数:32,代码来源:test_functional.py


示例13: startSentry

def startSentry(sentry_paths):
    """
    Sentry Runner
    """
    notifier = Notifier(wm, do_event())
    for path in sentry_paths:
        wdd = wm.add_watch(sentry_paths[path].path, mask, rec=True)
        print 'watching %s ' % sentry_paths[path].path

    #wdd = wm.add_watch('/home/cmdln/sandbox/permissionminder/test', mask, rec=True)
    while True:
        try:
            notifier.process_events()
            if notifier.check_events():
                notifier.read_events()
        except KeyboardInterrupt:
            notifier.stop()
            break
        except ConfigChange:
            notifier.stop()
            raise ConfigChange('Config Changed')
            break
        except:
            #from pdb import set_trace;set_trace()
            pass
开发者ID:nickanderson,项目名称:psentry,代码行数:25,代码来源:psentry.py


示例14: FSMonitor

def FSMonitor(path='/root/wpf'):
        wm = WatchManager()
        mask = IN_DELETE | IN_MODIFY | IN_CREATE
        notifier = Notifier(wm, EventHandler(),read_freq=10)
        notifier.coalesce_events()
        # 设置受监视的事件,这里只监视文件创建事件,(rec=True, auto_add=True)为递归处理
        wm.add_watch(path,mask,rec=True, auto_add=True)
        notifier.loop()
开发者ID:ading2016,项目名称:My_Repo,代码行数:8,代码来源:notify.py


示例15: tailwatch

def tailwatch(dir):
    FLAGS = EventsCodes.ALL_FLAGS
    mask = FLAGS['IN_CREATE'] |FLAGS['IN_DELETE'] | FLAGS['IN_MODIFY']
    wm = WatchManager()
    p = PTmp()
    notifier = Notifier(wm, p)
    wdd = wm.add_watch(dir, mask, rec=True)
    notifier.loop()
开发者ID:danohuiginn,项目名称:doh,代码行数:8,代码来源:logwatch.py


示例16: builder_process

 def builder_process():
     logger.debug(' - Watched static files for changes to rebuild')
     wm = WatchManager()
     notifier = Notifier(wm, default_proc_fun=_build)
     wm.add_watch(
         watched_dir,
         IN_MODIFY, # | IN_CREATE | IN_DELETE,
         rec=True, auto_add=True
     )
     notifier.loop()
开发者ID:wreckah,项目名称:statika,代码行数:10,代码来源:dev.py


示例17: QNotifier

class QNotifier(QThread):
    def __init__(self, wm, processor):
        self.event_queue = list()
        self._processor = processor
        self.notifier = Notifier(wm,
                            NinjaProcessEvent(self.event_queue.append))
        self.notifier.coalesce_events(True)
        self.keep_running = True
        QThread.__init__(self)

    def run(self):
        while self.keep_running:
            try:
                self.notifier.process_events()
            except OSError:
                pass  # OSError: [Errno 2] No such file or directory happens
            e_dict = {}
            while len(self.event_queue):
                e_type, e_path = self.event_queue.pop(0)
                e_dict.setdefault(e_path, []).append(e_type)

            keys = e_dict.keys()
            while len(keys):
                key = keys.pop(0)
                event = e_dict.pop(key)
                if (ADDED in event) and (DELETED in event):
                    event = [e for e in event if e not in (ADDED, DELETED)]
                for each_event in event:
                    self._processor(each_event, key)
            if self.notifier.check_events():
                self.notifier.read_events()

        self.notifier.stop()
开发者ID:dead5,项目名称:ninja-ide,代码行数:33,代码来源:linux.py


示例18: create_monitor

def create_monitor(to_watch, name):
    "Create and start a new directory monitor."
    messenger = NetworkSender(name)
    p = Monitor(messenger)
    wm = WatchManager()  # Watch Manager
    notifier = Notifier(wm, p) # Notifier
    try: 
        wdd = wm.add_watch(to_watch, IN_DELETE | IN_CREATE | IN_MODIFY)
        notifier.loop()
    except WatchManagerError, err:
        print err, err.wmd
开发者ID:abachman,项目名称:watch-me-work,代码行数:11,代码来源:watch_me.py


示例19: monitor

def monitor(watch_path, callback):
    watch_path = os.path.abspath(watch_path)
    if os.path.isfile(watch_path):
        path_for_manager = os.path.dirname(watch_path)
    else:
        path_for_manager = watch_path

    manager = WatchManager()
    notifier = Notifier(manager,
                        AutoRunner(watch_path, callback))
    manager.add_watch(path_for_manager, IN_MODIFY)

    notifier.loop()
开发者ID:fisadev,项目名称:ponytor,代码行数:13,代码来源:ponytor.py


示例20: start_queue

def start_queue():
    dir_queue = vmcheckerpaths.dir_queue()

    # register for inotify envents before processing stale jobs
    wm = WatchManager()
    notifier = Notifier(wm, _QueueManager())
    wm.add_watch(dir_queue, EventsCodes.ALL_FLAGS['IN_CLOSE_WRITE'])

    process_stale_jobs(dir_queue)

    # set callback to receive notifications (includes queued jobs after
    # setting up inotify but before we finished processing stale jobs)
    notifier.loop(callback=_callback)
开发者ID:MihaiTabara,项目名称:vmchecker,代码行数:13,代码来源:queue_manager.py



注:本文中的pyinotify.Notifier类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pyinotify.ThreadedNotifier类代码示例发布时间:2022-05-25
下一篇:
Python queries.Query类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap