• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test_base.expectTrue函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test_base.expectTrue函数的典型用法代码示例。如果您正苦于以下问题:Python expectTrue函数的具体用法?Python expectTrue怎么用?Python expectTrue使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了expectTrue函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_2_daemon_with_option

    def test_2_daemon_with_option(self):
        logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
        daemon = self._run_daemon(
            {
                "disable_watchdog": True,
                "disable_extensions": True,
                "disable_logging": False,
            },
            options_only={
                "logger_path": logger_path,
                "verbose": True,
            })

        self.assertTrue(daemon.isAlive())

        def info_exists():
            info_path = test_base.getLatestInfoLog(logger_path)
            return os.path.exists(info_path)

        # Wait for the daemon to flush to GLOG.
        test_base.expectTrue(info_exists)

        # Assign the variable after we have assurances it exists
        info_path = test_base.getLatestInfoLog(logger_path)
        self.assertTrue(os.path.exists(info_path))

        # Lastly, verify that we have permission to read the file
        data = ''
        with open(info_path, 'r') as fh:
            try:
                data = fh.read()
            except:
                pass
        self.assertTrue(len(data) > 0)
        daemon.kill()
开发者ID:Centurion89,项目名称:osquery,代码行数:35,代码来源:test_osqueryd.py


示例2: test_3_example_extension

    def test_3_example_extension(self):
        daemon = self._run_daemon({"disable_watchdog": True})
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client
        client = EXClient(daemon.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open())
        em = client.getEM()

        # Make sure there are no extensions registered
        result = test_base.expect(em.extensions, 0)
        self.assertEqual(len(result), 0)

        # Make sure the extension process starts
        extension = self._run_extension(path=daemon.options["extensions_socket"])
        self.assertTrue(extension.isAlive())

        # Now that an extension has started, check extension list
        result = test_base.expect(em.extensions, 1)
        self.assertEqual(len(result), 1)
        ex_uuid = result.keys()[0]
        ex_data = result[ex_uuid]
        self.assertEqual(ex_data.name, "example")
        self.assertEqual(ex_data.version, "0.0.1")
        self.assertEqual(ex_data.min_sdk_version, "0.0.0")

        # Get a python-based thrift client to the extension's service
        client2 = EXClient(daemon.options["extensions_socket"], uuid=ex_uuid)
        client2.open()
        ex = client2.getEX()
        self.assertEqual(ex.ping().code, 0)

        # Make sure the extension can receive a call
        em_time = em.call("table", "time", {"action": "columns"})
        ex_time = ex.call("table", "time", {"action": "columns"})
        print (em_time)
        print (ex_time)
        self.assertEqual(ex_time.status.code, 0)
        self.assertTrue(len(ex_time.response) > 0)
        self.assertTrue("name" in ex_time.response[0])
        self.assertEqual(ex_time.status.uuid, ex_uuid)

        # Make sure the extension includes a custom registry plugin
        result = ex.call("table", "example", {"action": "generate"})
        print (result)
        self.assertEqual(result.status.code, 0)
        self.assertEqual(len(result.response), 1)
        self.assertTrue("example_text" in result.response[0])
        self.assertTrue("example_integer" in result.response[0])
        self.assertEqual(result.response[0]["example_text"], "example")

        # Make sure the core can route to the extension
        result = em.call("table", "example", {"action": "generate"})
        print (result)

        client2.close()
        client.close()
        extension.kill()
        daemon.kill()
开发者ID:imaxxs,项目名称:osquery,代码行数:60,代码来源:test_extensions.py


示例3: test_3_daemon_lost_worker

    def test_3_daemon_lost_worker(self):
        # Test that killed workers are respawned by the watcher
        if os.environ.get('SANITIZE') is not None:
            return
        daemon = self._run_daemon({
            "allow_unsafe": True,
            "disable_watchdog": False,
            "ephemeral": True,
            "disable_database": True,
            "disable_logging": True,
        })
        self.assertTrue(daemon.isAlive())

        # Check that the daemon spawned a child process
        children = daemon.getChildren()
        self.assertTrue(len(children) > 0)

        # Kill only the child worker
        os.kill(children[0], signal.SIGINT)
        self.assertTrue(daemon.isDead(children[0]))
        self.assertTrue(daemon.isAlive())

        # Expect the children of the daemon to be respawned
        def waitDaemonChildren():
            children = daemon.getChildren()
            return len(children) > 0
        test_base.expectTrue(waitDaemonChildren)
        children = daemon.getChildren()
        self.assertTrue(len(children) > 0)
开发者ID:Centurion89,项目名称:osquery,代码行数:29,代码来源:test_osqueryd.py


示例4: test_9_external_config_update

    def test_9_external_config_update(self):
        # Start an extension without a daemon, with a timeout.
        extension = self._run_extension(timeout=EXTENSION_TIMEOUT)
        self.assertTrue(extension.isAlive())

        # Now start a daemon
        daemon = self._run_daemon({
            "disable_watchdog": True,
            "extensions_timeout": EXTENSION_TIMEOUT,
            "extensions_socket": extension.options["extensions_socket"],
        })
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client to the manager and extension.
        client = test_base.EXClient(extension.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open(timeout=EXTENSION_TIMEOUT))
        em = client.getEM()

        # Need the manager to request the extension's UUID.
        result = test_base.expect(em.extensions, 1)
        self.assertTrue(result is not None)
        ex_uuid = result.keys()[0]
        client2 = test_base.EXClient(extension.options["extensions_socket"],
<<<<<<< HEAD
                                     uuid=ex_uuid)
开发者ID:pathcl,项目名称:osquery,代码行数:26,代码来源:test_extensions.py


示例5: test_6_logger_mode

    def test_6_logger_mode(self):
        logger_path = os.path.join(test_base.CONFIG_DIR, "logger-mode-tests")
        os.makedirs(logger_path)

        test_mode = 0754  # Strange mode that should never exist
        daemon = self._run_daemon(
            {"disable_watchdog": True, "disable_extensions": True, "disable_logging": False},
            options_only={"logger_path": logger_path, "logger_mode": test_mode, "verbose": True},
        )
        info_path = os.path.join(logger_path, "osqueryd.INFO")
        self.assertTrue(daemon.isAlive())

        def info_exists():
            return os.path.exists(info_path)

        # Wait for the daemon to flush to GLOG.
        test_base.expectTrue(info_exists)

        # Both log files should exist and have the given mode.
        for fname in ["osqueryd.INFO", "osqueryd.results.log"]:
            pth = os.path.join(logger_path, fname)
            self.assertTrue(os.path.exists(pth))

            rpath = os.path.realpath(info_path)
            mode = os.stat(rpath).st_mode & 0777
            self.assertEqual(mode, test_mode)

        daemon.kill()
开发者ID:rgarbina,项目名称:osquery,代码行数:28,代码来源:test_osqueryd.py


示例6: test_2_daemon_api

    def test_2_daemon_api(self):
        daemon = self._run_daemon({"disable_watchdog": True})
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client
        client = EXClient(daemon.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open())
        em = client.getEM()

        # List the number of extensions
        print (em.ping())
        result = test_base.expect(em.extensions, 0)
        self.assertEqual(len(result), 0)

        # Try the basic ping API
        self.assertEqual(em.ping().code, 0)

        # Try a query
        response = em.query("select * from time")
        self.assertEqual(response.status.code, 0)
        self.assertEqual(len(response.response), 1)
        self.assertTrue("seconds" in response.response[0].keys())

        # Try to get the query columns
        response = em.getQueryColumns("select seconds as s from time")
        self.assertEqual(response.status.code, 0)
        self.assertEqual(len(response.response), 1)
        self.assertTrue("s" in response.response[0])
        client.close()
        daemon.kill()
开发者ID:imaxxs,项目名称:osquery,代码行数:31,代码来源:test_extensions.py


示例7: test_5_extension_timeout

    def test_5_extension_timeout(self):
        # Start an extension without a daemon, with a timeout.
        extension = self._run_extension(timeout=EXTENSION_TIMEOUT)
        self.assertTrue(extension.isAlive())

        # Now start a daemon
        daemon = self._run_daemon({
            "disable_watchdog": True,
            "extensions_socket": extension.options["extensions_socket"],
            "verbose": True,
        })
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client
        client = test_base.EXClient(extension.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open(timeout=EXTENSION_TIMEOUT))
        em = client.getEM()

        # The waiting extension should have connected to the daemon.
        result = test_base.expect(em.extensions, 1)
        self.assertEqual(len(result), 1)

        client.close()
        daemon.kill(True)
        extension.kill()
开发者ID:adityavs,项目名称:osquery,代码行数:26,代码来源:test_extensions.py


示例8: test_6_logger_mode

    def test_6_logger_mode(self):
        logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
        test_mode = 0754        # Strange mode that should never exist
        daemon = self._run_daemon({
            "disable_watchdog": True,
            "disable_extensions": True,
            "disable_logging": False,
        },
        options_only={
            "logger_path": logger_path,
            "logger_mode": test_mode,
            "verbose": True,
        })
        info_path = os.path.join(logger_path, "osqueryd.INFO")
        self.assertTrue(daemon.isAlive())

        def info_exists():
            return os.path.exists(info_path)
        # Wait for the daemon to flush to GLOG.
        test_base.expectTrue(info_exists)

        # Both log files should exist, the results should have the given mode.
        for fname in ['osqueryd.INFO', 'osqueryd.results.log']:
            pth = os.path.join(logger_path, fname)
            self.assertTrue(os.path.exists(pth))

            # Only apply the mode checks to .log files.
            if fname.find('.log') > 0:
                rpath = os.path.realpath(pth)
                mode = os.stat(rpath).st_mode & 0777
                self.assertEqual(mode, test_mode)

        daemon.kill()
开发者ID:defaultnamehere,项目名称:osquery,代码行数:33,代码来源:test_osqueryd.py


示例9: tearDown

    def tearDown(self):
        if os.path.exists(self.tmp_dir):
            shutil.rmtree(self.tmp_dir)

        # Ensure that even if events fail we always remove the services
        if len(self.service_list_) > 0:
            for s in self.service_list_:
                stopService(s)
                test_base.expectTrue(serviceDead)
                uninstallService(s)
开发者ID:JonathanNogueira,项目名称:osquery,代码行数:10,代码来源:test_windows_service.py


示例10: setUp

    def setUp(self):
        self.daemon = self._run_daemon({
            # The set of queries will hammer the daemon process.
            "disable_watchdog": True,
        })
        self.assertTrue(self.daemon.isAlive())

        # The sets of example tests will use the extensions API.s
        self.client = test_base.EXClient(self.daemon.options["extensions_socket"])
        test_base.expectTrue(self.client.open)
        self.assertTrue(self.client.open())
        self.em = self.client.getEM()
开发者ID:151706061,项目名称:osquery,代码行数:12,代码来源:test_example_queries.py


示例11: test_query_packs

    def test_query_packs(self):
        query_pack_path = test_base.CONFIG_DIR + "/test_pack.conf"
        utils.write_config({
            "queries": {
                "simple_test": {
                    "query": "select * from time",
                    "interval": 60,
                },
                "simple_test2": {
                    "query": "select * from time",
                    "interval": 60,
                    "platform": "does_not_exist",
                }
            }
        }, path=query_pack_path)

        # Get a daemon process, loaded with the default test configuration.
        # We'll add a config override (overwrite) for the "packs" key.
        # THis will point a single pack at the config written above.
        daemon = self._run_daemon({
            "disable_watchdog": True,
            },
            overwrite={
            "packs": {
                "test_pack": query_pack_path
            },
        })
        self.assertTrue(daemon.isAlive())

        # Introspect into the daemon's query packs.
        client = test_base.EXClient(daemon.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open())
        em = client.getEM()

        # Every query from the pack(s) is added to the packs table.
        def get_packs():
            result = em.query("select * from osquery_packs")
            return len(result.response) == 2
        # Allow the daemon some lag to parse the pack content.
        test_base.expectTrue(get_packs)
        result = em.query("select * from osquery_packs")
        self.assertEqual(len(result.response), 2)

        # Only the applicable queries are added to the schedule.
        # There will be len(pack_queries) - 1 since "simple_test2" is bound
        # to an unknown/non-existing platform.
        result = em.query("select * from osquery_schedule")
        self.assertEqual(len(result.response), 1)
        daemon.kill()
开发者ID:1514louluo,项目名称:osquery,代码行数:50,代码来源:test_additional.py


示例12: setUp

    def setUp(self):
        self.daemon = self._run_daemon({
            # The set of queries will hammer the daemon process.
            "disable_watchdog": True,
            # Enable the 'hidden' flag "registry_exceptions" to prevent catching.
            "registry_exceptions": True,
        })
        self.assertTrue(self.daemon.isAlive())

        # The sets of example tests will use the extensions APIs.
        self.client = test_base.EXClient(self.daemon.options["extensions_socket"])
        test_base.expectTrue(self.client.open)
        self.assertTrue(self.client.open())
        self.em = self.client.getEM()
开发者ID:runt18,项目名称:osquery,代码行数:14,代码来源:test_example_queries.py


示例13: cleanOsqueryServices

def cleanOsqueryServices():
    service_args = POWERSHELL_ARGS + ['$(Get-Service osqueryd_test_*).Name']
    services = subprocess.check_output(service_args).split()
    
    # No services found on the system
    if len(services) == 0:
        return
    
    for service in services:
        stopService(service)
        # Local workaround as we need the service name
        def isServiceStopped():
            return serviceStopped(service)
        test_base.expectTrue(isServiceStopped)
        uninstallService(service)
开发者ID:JonathanNogueira,项目名称:osquery,代码行数:15,代码来源:test_windows_service.py


示例14: test_4_extension_dies

    def test_4_extension_dies(self):
        daemon = self._run_daemon({"disable_watchdog": True})
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client
        client = test_base.EXClient(daemon.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open())
        em = client.getEM()

        # Make sure there are no extensions registered
        result = test_base.expect(em.extensions, 0)
        self.assertEqual(len(result), 0)

        # Make sure the extension process starts
        extension = self._run_extension(
            path=daemon.options["extensions_socket"])
        self.assertTrue(extension.isAlive())

        # Now that an extension has started, check extension list
        result = test_base.expect(em.extensions, 1)
        self.assertEqual(len(result), 1)

        # Kill the extension
        extension.kill()

        # Make sure the daemon detects the change
        result = test_base.expect(em.extensions, 0, timeout=5)
        self.assertEqual(len(result), 0)

        # Make sure the extension restarts
        extension = self._run_extension(
            path=daemon.options["extensions_socket"])
        self.assertTrue(extension.isAlive())

        # With the reset there should be 1 extension again
        result = test_base.expect(em.extensions, 1)
        self.assertEqual(len(result), 1)
        print (em.query("select * from example"))

        # Now tear down the daemon
        client.close()
        daemon.kill()

        # The extension should tear down as well
        self.assertTrue(extension.isDead(extension.pid))
开发者ID:151706061,项目名称:osquery,代码行数:46,代码来源:test_extensions.py


示例15: test_2_daemon_with_option

    def test_2_daemon_with_option(self):
        logger_path = os.path.join(test_base.CONFIG_DIR, "logger-tests")
        os.makedirs(logger_path)
        daemon = self._run_daemon(
            {"disable_watchdog": True, "disable_extensions": True, "disable_logging": False},
            options_only={"logger_path": logger_path, "verbose": True},
        )
        info_path = os.path.join(logger_path, "osqueryd.INFO")
        self.assertTrue(daemon.isAlive())

        def info_exists():
            return os.path.exists(info_path)

        # Wait for the daemon to flush to GLOG.
        test_base.expectTrue(info_exists)
        self.assertTrue(os.path.exists(info_path))
        daemon.kill()
开发者ID:Ramsey16,项目名称:osquery,代码行数:17,代码来源:test_osqueryd.py


示例16: test_9_external_config_update

    def test_9_external_config_update(self):
        # Start an extension without a daemon, with a timeout.
        extension = self._run_extension(timeout=EXTENSION_TIMEOUT)
        self.assertTrue(extension.isAlive())

        # Now start a daemon
        daemon = self._run_daemon({
            "disable_watchdog": True,
            "extensions_timeout": EXTENSION_TIMEOUT,
            "extensions_socket": extension.options["extensions_socket"],
        })
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client to the manager and extension.
        client = test_base.EXClient(extension.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open(timeout=EXTENSION_TIMEOUT))
        em = client.getEM()

        # Need the manager to request the extension's UUID.
        result = test_base.expect(em.extensions, 1)
        self.assertTrue(result is not None)
        ex_uuid = result.keys()[0]
        client2 = test_base.EXClient(extension.options["extensions_socket"],
            uuid=ex_uuid)
        test_base.expectTrue(client2.open)
        self.assertTrue(client2.open(timeout=EXTENSION_TIMEOUT))
        ex = client2.getEX()

        # Trigger an async update from the extension.
        request = {
            "action": "update",
            "source": "test",
            "data": "{\"options\": {\"config_plugin\": \"update_test\"}}"}
        ex.call("config", "example", request)

        # The update call in the extension should filter to the core.
        options = em.options()
        self.assertTrue("config_plugin" in options.keys())
        self.assertTrue(options["config_plugin"], "update_test")

        # Cleanup thrift connections and subprocesses.
        client2.close()
        client.close()
        extension.kill()
        daemon.kill()
开发者ID:adityavs,项目名称:osquery,代码行数:46,代码来源:test_extensions.py


示例17: test_7_extensions_autoload_watchdog

    def test_7_extensions_autoload_watchdog(self):
        loader = test_base.Autoloader(
            [test_base.ARGS.build + "/osquery/example_extension.ext"])
        daemon = self._run_daemon({"extensions_autoload": loader.path})
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client
        client = EXClient(daemon.options["extensions_socket"])
        test_base.expectTrue(client.open)
        self.assertTrue(client.open())
        em = client.getEM()

        # The waiting extension should have connected to the daemon.
        result = test_base.expect(em.extensions, 1)
        self.assertEqual(len(result), 1)

        client.close()
        daemon.kill(True)
开发者ID:imaxxs,项目名称:osquery,代码行数:18,代码来源:test_extensions.py


示例18: test_6_logger_mode

    def test_6_logger_mode(self):
        logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
        test_mode = 0754  # Strange mode that should never exist
        daemon = self._run_daemon(
            {
                "disable_watchdog": True,
                "disable_extensions": True,
                "disable_logging": False,
            },
            options_only={
                "logger_path": logger_path,
                "logger_mode": test_mode,
                "verbose": True,
            })

        results_path = os.path.join(logger_path, "osqueryd.results.log")
        self.assertTrue(daemon.isAlive())

        # Wait for the daemon to write the info log to disk before continuing
        def info_exists():
            info_path = test_base.getLatestInfoLog(logger_path)
            return os.path.exists(info_path)
        info_path = test_base.getLatestInfoLog(logger_path)

        def results_exists():
            return os.path.exists(results_path)

        # Wait for the daemon to flush to GLOG.
        test_base.expectTrue(info_exists)
        test_base.expectTrue(results_exists)

        # Both log files should exist, the results should have the given mode.
        for pth in [info_path, results_path]:
            self.assertTrue(os.path.exists(pth))

            # Only apply the mode checks to .log files.
            # TODO: Add ACL checks for Windows logs
            if pth.find('.log') > 0 and os.name != "nt":
                rpath = os.path.realpath(pth)
                mode = os.stat(rpath).st_mode & 0777
                self.assertEqual(mode, test_mode)

        daemon.kill()
开发者ID:Centurion89,项目名称:osquery,代码行数:43,代码来源:test_osqueryd.py


示例19: test_6_extensions_autoload

    def test_6_extensions_autoload(self):
        loader = test_base.Autoloader("/tmp/osqueryd-temp-ext.load",
            [test_base.ARGS.build + "/osquery/example_extension.ext"])
        daemon = self._run_daemon({
            "disable_watchdog": True,
            "extensions_autoload": loader.path,
        })
        self.assertTrue(daemon.isAlive())

        # Get a python-based thrift client
        client = EXClient()
        test_base.expectTrue(client.open)
        self.assertTrue(client.open())
        em = client.getEM()

        # The waiting extension should have connected to the daemon.
        result = test_base.expect(em.extensions, 1)
        self.assertEqual(len(result), 1)

        client.close()
        daemon.kill(True)
开发者ID:jreese,项目名称:osquery,代码行数:21,代码来源:test_extensions.py


示例20: test_1_install_run_stop_uninstall_windows_service

    def test_1_install_run_stop_uninstall_windows_service(self):
        name = 'osqueryd_test_{}'.format(self.test_instance)
        code, _ = installService(name, self.bin_path)
        self.assertEqual(code, 0)
        self.service_list_.append(name)

        code = startService(name, '--flagfile', self.flagfile)
        self.assertEqual(code, 0)

        # Ensure the service is online before proceeding
        test_base.expectTrue(serviceAlive)

        _, output = queryService(name)
        self.assertEqual(output, '4RUNNING')

        # The daemon should not be able to load if the service is running
        _, stderr = self.runDaemon(
            '--allow_unsafe', '--verbose', '--config_path', self.config_path,
            '--database_path', self.database_path, '--logger_path',
            self.log_path, '--pidfile', self.pidfile)

        self.assertNotEqual(stderr.find('is already running'), -1)

        if code == 0:
            code = stopService(name)
            self.assertEqual(code, 0)

        test_base.expectTrue(serviceDead)
        self.assertTrue(serviceDead())

        _, output = queryService(name)
        self.assertEqual(output, '1STOPPED')
        code, _ = uninstallService(name)
        self.assertEqual(code, 0)

        self.service_list_.remove(name)

        # Make sure the service no longer exists, error code 1060
        code, _ = queryService(name)
        self.assertEqual(code, 1060)
开发者ID:JonathanNogueira,项目名称:osquery,代码行数:40,代码来源:test_windows_service.py



注:本文中的test_base.expectTrue函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test_base.load_json函数代码示例发布时间:2022-05-27
下一篇:
Python test_base.expect函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap