• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python moto.mock_s3函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中moto.mock_s3函数的典型用法代码示例。如果您正苦于以下问题:Python mock_s3函数的具体用法?Python mock_s3怎么用?Python mock_s3使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了mock_s3函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_get_canonical_ids

    def test_get_canonical_ids(self):
        accounts = Account.query.all()
        get_canonical_ids(accounts)

        for account in accounts:
            assert len(account.custom_fields) == 1
            assert account.custom_fields[0].name == "canonical_id"
            assert account.custom_fields[0].value == "bcaf1ffd86f41161ca5fb16fd081034f"  # Default from moto.

            # Make it something else to test overrides:
            account.custom_fields[0].value = "replaceme"
            db.session.add(account)

        db.session.commit()

        # Test without override (nothing should be changed):
        get_canonical_ids(accounts)
        for account in accounts:
            assert len(account.custom_fields) == 1
            assert account.custom_fields[0].name == "canonical_id"
            assert account.custom_fields[0].value == "replaceme"

        # Test override:
        get_canonical_ids(accounts, override=True)
        for account in accounts:
            assert len(account.custom_fields) == 1
            assert account.custom_fields[0].name == "canonical_id"
            assert account.custom_fields[0].value == "bcaf1ffd86f41161ca5fb16fd081034f"  # Default from moto.

        mock_sts().stop()
        mock_s3().stop()
开发者ID:DataDog,项目名称:security_monkey,代码行数:31,代码来源:test_s3_canonical.py


示例2: s3_resource

def s3_resource(tips_file):
    pytest.importorskip('s3fs')
    moto.mock_s3().start()

    test_s3_files = [
        ('tips.csv', tips_file),
        ('tips.csv.gz', tips_file + '.gz'),
        ('tips.csv.bz2', tips_file + '.bz2'),
    ]

    def add_tips_files(bucket_name):
        for s3_key, file_name in test_s3_files:
            with open(file_name, 'rb') as f:
                conn.Bucket(bucket_name).put_object(
                    Key=s3_key,
                    Body=f)

    boto3 = pytest.importorskip('boto3')
    # see gh-16135
    bucket = 'pandas-test'

    conn = boto3.resource("s3", region_name="us-east-1")
    conn.create_bucket(Bucket=bucket)
    add_tips_files(bucket)

    conn.create_bucket(Bucket='cant_get_it', ACL='private')
    add_tips_files('cant_get_it')

    yield conn

    moto.mock_s3().stop()
开发者ID:Axik,项目名称:pandas,代码行数:31,代码来源:test_network.py


示例3: pre_test_setup

    def pre_test_setup(self):
        account_type_result = AccountType.query.filter(AccountType.name == 'AWS').first()
        if not account_type_result:
            account_type_result = AccountType(name='AWS')
            db.session.add(account_type_result)
            db.session.commit()

        self.account = Account(identifier="012345678910", name="testing",
                               account_type_id=account_type_result.id)
        self.technology = Technology(name="s3")
        self.item = Item(region="us-west-2", name="somebucket",
                         arn="arn:aws:s3:::somebucket", technology=self.technology,
                         account=self.account)

        db.session.add(self.account)
        db.session.add(self.technology)
        db.session.add(self.item)

        db.session.commit()

        mock_s3().start()
        client = boto3.client("s3")
        client.create_bucket(Bucket="somebucket")
        client.create_bucket(Bucket="someotherbucket")
        client.create_bucket(Bucket="someotherbucket2")
开发者ID:crruthe,项目名称:security_monkey,代码行数:25,代码来源:test_s3.py


示例4: setUp

 def setUp(self):
     mock_s3().start()
     patchers = [
         "autopush.main.task",
         "autopush.main.reactor",
         "autopush.settings.TwistedMetrics",
     ]
     self.mocks = {}
     for name in patchers:
         patcher = patch(name)
         self.mocks[name] = patcher.start()
开发者ID:martinthomson,项目名称:autopush,代码行数:11,代码来源:test_main.py


示例5: setUp

    def setUp(self):
        self.mock = mock_s3()
        self.mock.start()

        #
        # Populate the data in mock S3
        #
        # s3+file first
        conn = boto.connect_s3()
        b = conn.create_bucket(self.bucket_name)
        k = Key(b)
        k.name = self.key_name
        with open(test_file(self.key_name), 'rb') as f:
            k.set_contents_from_file(f)

        # s3+dir
        b = conn.create_bucket(self.dir_bucket_name)
        for fname in ('index.json', '1', '2', '3', '4', '5', '6'):
            k = Key(b)
            k.name = posixpath.join(self.dir_list_name, fname)
            with open(test_file(posixpath.join('delta_dir_source', fname)),
                      'rb') as f:
                k.set_contents_from_file(f)

        # initialize the internal list data structure via the normal method
        super(S3SourceListsTest, self).setUp()
开发者ID:rbillings,项目名称:shavar,代码行数:26,代码来源:test_lists.py


示例6: setUp

    def setUp(self):
        self.s3 = mock_s3()
        self.s3.start()
        boto = connect_s3()
        boto.create_bucket(self._bucket)

        super(MPConnectionTest, self).setUp()
开发者ID:V-Lamp,项目名称:longaccess-client,代码行数:7,代码来源:test_mpconnection.py


示例7: test_unpublish_cmd

 def test_unpublish_cmd(self):
     with mock_s3():
         conn = boto.connect_s3()
         bucket = conn.create_bucket(settings.AWS_BUCKET_NAME)
         call_command("build")
         call_command("unpublish", no_pooling=True, verbosity=3)
         self.assertFalse(list(key for key in bucket.list()))
开发者ID:ramonakira,项目名称:django-bakery,代码行数:7,代码来源:__init__.py


示例8: test_cache_control

 def test_cache_control(self):
     if not sys.version_info[:2] == (3, 4):
         from moto import mock_s3
         with mock_s3():
             # Set random max-age for various content types
             with self.settings(BAKERY_CACHE_CONTROL={
                 "application/javascript": random.randint(0, 100000),
                 "text/css": random.randint(0, 100000),
                 "text/html": random.randint(0, 100000),
             }):
                 conn = boto.connect_s3()
                 bucket = conn.create_bucket(settings.AWS_BUCKET_NAME)
                 call_command("build")
                 call_command("publish", no_pooling=True, verbosity=3)
                 for key in bucket:
                     key = bucket.get_key(key.name)
                     if key.content_type in settings.BAKERY_CACHE_CONTROL:
                         # key.cache_control returns string
                         # with "max-age=" prefix
                         self.assertIn(
                             str(settings.BAKERY_CACHE_CONTROL.get(
                                 key.content_type)),
                             key.cache_control
                         )
     else:
         self.skipTest("Moto doesn't work in Python 3.4")
开发者ID:lexieheinle,项目名称:django-bakery,代码行数:26,代码来源:__init__.py


示例9: setUp

    def setUp(self):
        self.mock_s3 = moto.mock_s3()
        self.mock_s3.start()
        self.s3_conn = boto.connect_s3()
        self.s3_conn.create_bucket('last_bucket')
        bucket = self.s3_conn.get_bucket('last_bucket')
        key = bucket.new_key('test_list/LAST')
        self.pointers = ['pointer1', 'pointer2', 'pointer3', '']
        key.set_contents_from_string('\r\n'.join(self.pointers))
        key.close()

        for key_name in POINTER_KEYS:
            key = bucket.new_key(key_name)
            out = StringIO.StringIO()
            with gzip.GzipFile(fileobj=out, mode='w') as f:
                f.write(json.dumps({'name': key_name}))
            key.set_contents_from_string(out.getvalue())
            key.close()

        self.options_prefix_pointer = {
            'bucket': 'last_bucket',
            'aws_access_key_id': 'KEY',
            'aws_secret_access_key': 'SECRET',
            'prefix_pointer': 'test_list/LAST'
        }
开发者ID:eliasdorneles,项目名称:exporters,代码行数:25,代码来源:test_readers_s3.py


示例10: test_gzip_and_send_s3

    def test_gzip_and_send_s3(self):
        """
        Tests that a gzip is made and sent to S3 and everything cleaned after
        """
        # First create some dummy content to work with

        output_path = '{0}/test_out/'.format(os.getcwd())
        helper_extract_all(cluster=self.cluster, output_path=output_path)

        with mock_s3():
            s3_resource = boto3.resource('s3')
            s3_resource.create_bucket(Bucket=self.s3_details['bucket'])

            # Run the gzip and send
            dashboard.push_to_s3(
                input_directory=output_path,
                s3_details=self.s3_details
            )

            # Check there is a gzip in the bucket
            s3_object = s3_resource.Object(
                self.s3_details['bucket'],
                'dashboard.tar.gz'
            )

            keys = s3_object.get().keys()
            self.assertTrue(
                len(keys) > 0
            )

            # Clean up files
            shutil.rmtree(output_path)
开发者ID:adsabs,项目名称:kibtools,代码行数:32,代码来源:test_dashboard.py


示例11: test_publish_cmd

 def test_publish_cmd(self):
     if not sys.version_info[:2] == (3, 4):
         from moto import mock_s3
         with mock_s3():
             conn = boto.connect_s3()
             bucket = conn.create_bucket(settings.AWS_BUCKET_NAME)
             call_command("build")
             call_command("publish", no_pooling=True, verbosity=3)
             local_file_list = []
             for (dirpath, dirnames, filenames) in os.walk(
                     settings.BUILD_DIR):
                 for fname in filenames:
                     local_key = os.path.join(
                         os.path.relpath(dirpath, settings.BUILD_DIR),
                         fname
                     )
                     if local_key.startswith('./'):
                         local_key = local_key[2:]
                     local_file_list.append(local_key)
             for key in bucket.list():
                 self.assertIn(key.name, local_file_list)
             call_command("unbuild")
             os.makedirs(settings.BUILD_DIR)
             call_command("publish", no_pooling=True, verbosity=3)
     else:
         self.skipTest("Moto doesn't work in Python 3.4")
开发者ID:lexieheinle,项目名称:django-bakery,代码行数:26,代码来源:__init__.py


示例12: mock_s3_resource

    def mock_s3_resource(self):
        mock = mock_s3()
        mock.start()

        yield mock

        mock.stop()
开发者ID:drgarcia1986,项目名称:simple-settings,代码行数:7,代码来源:test_s3.py


示例13: setUp

    def setUp(self):
        self.mock = moto.mock_s3()
        self.mock.start()
        self.conn = boto.connect_s3()
        self.conn.create_bucket(TEST_BUCKET_NAME)
        pyramid = Pyramid(stride=8)
        grid_image = os.path.join(DATA_DIRECTORY, 'grid_crop', 'grid.png')

        metatile = MetaTile(MetaTileIndex(19, 453824, 212288, 8),
                            data=open(grid_image, 'rb').read(),
                            mimetype='image/png')
        format = FormatBundle(MapType('image'), TileFormat('PNG'))

        storage = S3MetaTileStorage(levels=pyramid.levels,
                                    stride=pyramid.stride,
                                    bucket=TEST_BUCKET_NAME,
                                    prefix='testlayer',
                                    format=format)
        storage.put(metatile)

        self.node = S3StorageNode('s3', maptype='image',
                                  tileformat=dict(format='PNG'),
                                  levels=pyramid.levels,
                                  stride=pyramid.stride,
                                  bucket=TEST_BUCKET_NAME,
                                  prefix='testlayer')
        self.expected = grid_image
开发者ID:Kotaimen,项目名称:stonemason,代码行数:27,代码来源:test_storage.py


示例14: test_cache_control

    def test_cache_control(self):
        s3 = boto3.resource('s3')
        with mock_s3():
            # Set random max-age for various content types
            with self.settings(BAKERY_CACHE_CONTROL={
                "application/javascript": random.randint(0, 100000),
                "text/css": random.randint(0, 100000),
                "text/html": random.randint(0, 100000),
            }):
                self._create_bucket()
                call_command("build")
                call_command("publish", verbosity=3)

                for obj in self._get_bucket_objects():
                    s3_obj = s3.Object(
                        settings.AWS_BUCKET_NAME, obj.get('Key'))

                    if s3_obj.content_type in settings.BAKERY_CACHE_CONTROL:
                        # key.cache_control returns string
                        # with "max-age=" prefix
                        self.assertIn(
                            str(settings.BAKERY_CACHE_CONTROL.get(
                                s3_obj.content_type)),
                            s3_obj.cache_control
                        )
开发者ID:datadesk,项目名称:django-bakery,代码行数:25,代码来源:__init__.py


示例15: pre_test_setup

    def pre_test_setup(self):
        self.account_type = AccountType(name='AWS')
        db.session.add(self.account_type)
        db.session.commit()

        for x in range(0, 9):
            db.session.add(Account(name="account{}".format(x), account_type_id=self.account_type.id,
                                   identifier="01234567891{}".format(x), active=True))

        db.session.commit()

        mock_sts().start()
        mock_s3().start()

        self.s3_client = boto3.client("s3")
        self.s3_client.create_bucket(Bucket="testBucket")
开发者ID:DataDog,项目名称:security_monkey,代码行数:16,代码来源:test_s3_canonical.py


示例16: s3

def s3():
    # writable local S3 system
    m = moto.mock_s3()
    m.start()
    import boto3
    client = boto3.client('s3')
    client.create_bucket(Bucket=test_bucket_name, ACL='public-read')
    for k in [a, b, c, d]:
        try:
            client.delete_object(Bucket=test_bucket_name, Key=k)
        except:
            pass
    for flist in [files, csv_files, text_files]:
        for f, data in flist.items():
            client.put_object(Bucket=test_bucket_name, Key=f, Body=data)
    yield S3FileSystem(anon=False)
    for flist in [files, csv_files, text_files]:
        for f, data in flist.items():
            try:
                client.delete_object(Bucket=test_bucket_name, Key=f, Body=data)
            except:
                pass
    for k in [a, b, c, d]:
        try:
            client.delete_object(Bucket=test_bucket_name, Key=k)
        except:
            pass
    m.stop()
开发者ID:mheilman,项目名称:s3fs,代码行数:28,代码来源:test_s3fs.py


示例17: setUp

    def setUp(self):
        mock_s3 = moto.mock_s3()
        mock_s3.start()
        self.addCleanup(mock_s3.stop)

        self.s3 = boto.connect_s3()
        self.s3.create_bucket('test_s3_bucket')
开发者ID:m3brown,项目名称:cfgov-refresh,代码行数:7,代码来源:test_s3utils.py


示例18: setUp

    def setUp(self):
        mock_s3 = moto.mock_s3()
        mock_s3.start()
        self.addCleanup(mock_s3.stop)

        self.s3 = boto3.client('s3')
        self.s3.create_bucket(Bucket='test_s3_bucket')
开发者ID:contolini,项目名称:cfgov-refresh,代码行数:7,代码来源:test_s3utils.py


示例19: mock_aws_services

def mock_aws_services():
    mock = moto.mock_s3()
    mock.start()

    yield

    mock.stop()
开发者ID:okomestudio,项目名称:pyu2,代码行数:7,代码来源:conftest.py


示例20: setUp

 def setUp(self):
     f = tempfile.NamedTemporaryFile(mode='wb', delete=False)
     self.tempFilePath = f.name
     f.write(b"I'm a temporary file for testing\n")
     f.close()
     self.mock_s3 = mock_s3()
     self.mock_s3.start()
开发者ID:17zuoye,项目名称:luigi,代码行数:7,代码来源:s3_test.py



注:本文中的moto.mock_s3函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python compat.OrderedDict类代码示例发布时间:2022-05-27
下一篇:
Python moto.mock_ec2函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap