• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python rbd.RBD类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中rbd.RBD的典型用法代码示例。如果您正苦于以下问题:Python RBD类的具体用法?Python RBD怎么用?Python RBD使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了RBD类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_rename

def test_rename():
    rbd = RBD()
    image_name2 = get_temp_image_name()
    rbd.rename(ioctx, image_name, image_name2)
    eq([image_name2], rbd.list(ioctx))
    rbd.rename(ioctx, image_name2, image_name)
    eq([image_name], rbd.list(ioctx))
开发者ID:OyTao,项目名称:ceph-learning,代码行数:7,代码来源:test_rbd.py


示例2: refresh_rbd_stats_pools

    def refresh_rbd_stats_pools(self, pools):
        self.log.debug('refreshing rbd pools %s' % (pools))

        rbd = RBD()
        counters_info = self.rbd_stats['counters_info']
        for pool_name, cfg_ns_names in pools.items():
            try:
                pool_id = self.rados.pool_lookup(pool_name)
                with self.rados.open_ioctx(pool_name) as ioctx:
                    if pool_id not in self.rbd_stats['pools']:
                        self.rbd_stats['pools'][pool_id] = {'images': {}}
                    pool = self.rbd_stats['pools'][pool_id]
                    pool['name'] = pool_name
                    pool['ns_names'] = cfg_ns_names
                    if cfg_ns_names:
                        nspace_names = list(cfg_ns_names)
                    else:
                        nspace_names = [''] + rbd.namespace_list(ioctx)
                    for nspace_name in pool['images']:
                        if nspace_name not in nspace_names:
                            del pool['images'][nspace_name]
                    for nspace_name in nspace_names:
                        if (nspace_name and
                                not rbd.namespace_exists(ioctx, nspace_name)):
                            self.log.debug('unknown namespace %s for pool %s' %
                                           (nspace_name, pool_name))
                            continue
                        ioctx.set_namespace(nspace_name)
                        if nspace_name not in pool['images']:
                            pool['images'][nspace_name] = {}
                        namespace = pool['images'][nspace_name]
                        images = {}
                        for image_meta in RBD().list2(ioctx):
                            image = {'n': image_meta['name']}
                            image_id = image_meta['id']
                            if image_id in namespace:
                                image['c'] = namespace[image_id]['c']
                            else:
                                image['c'] = [[0, 0] for x in counters_info]
                            images[image_id] = image
                        pool['images'][nspace_name] = images
            except Exception as e:
                self.log.error('failed listing pool %s: %s' % (pool_name, e))
        self.rbd_stats['pools_refresh_time'] = time.time()
开发者ID:IlsooByun,项目名称:ceph,代码行数:44,代码来源:module.py


示例3: setUp

 def setUp(self):
     global ioctx
     global features
     self.rbd = RBD()
     create_image()
     self.image = Image(ioctx, IMG_NAME)
     data = rand_data(256)
     self.image.write(data, IMG_SIZE / 2)
     self.image.create_snap("snap1")
     global features
     self.image.protect_snap("snap1")
     self.rbd.clone(ioctx, IMG_NAME, "snap1", ioctx, "clone", features)
     self.clone = Image(ioctx, "clone")
开发者ID:AlphaStaxLLC,项目名称:ceph,代码行数:13,代码来源:test_rbd.py


示例4: setUp

 def setUp(self):
     global ioctx
     global features
     self.rbd = RBD()
     create_image()
     self.image = Image(ioctx, image_name)
     data = rand_data(256)
     self.image.write(data, IMG_SIZE // 2)
     self.image.create_snap("snap1")
     global features
     self.image.protect_snap("snap1")
     self.clone_name = get_temp_image_name()
     self.rbd.clone(ioctx, image_name, "snap1", ioctx, self.clone_name, features)
     self.clone = Image(ioctx, self.clone_name)
开发者ID:carmark,项目名称:ceph,代码行数:14,代码来源:test_rbd.py


示例5: TestClone

class TestClone(object):

    @require_features([RBD_FEATURE_LAYERING])
    def setUp(self):
        global ioctx
        global features
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, image_name)
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2)
        self.image.create_snap('snap1')
        global features
        self.image.protect_snap('snap1')
        self.clone_name = get_temp_image_name()
        self.rbd.clone(ioctx, image_name, 'snap1', ioctx, self.clone_name,
                       features)
        self.clone = Image(ioctx, self.clone_name)

    def tearDown(self):
        global ioctx
        self.clone.close()
        self.rbd.remove(ioctx, self.clone_name)
        self.image.unprotect_snap('snap1')
        self.image.remove_snap('snap1')
        self.image.close()
        remove_image()

    def test_unprotected(self):
        self.image.create_snap('snap2')
        global features
        clone_name2 = get_temp_image_name()
        assert_raises(InvalidArgument, self.rbd.clone, ioctx, image_name,
                      'snap2', ioctx, clone_name2, features)
        self.image.remove_snap('snap2')

    def test_unprotect_with_children(self):
        global features
        # can't remove a snapshot that has dependent clones
        assert_raises(ImageBusy, self.image.remove_snap, 'snap1')

        # validate parent info of clone created by TestClone.setUp
        (pool, image, snap) = self.clone.parent_info()
        eq(pool, pool_name)
        eq(image, image_name)
        eq(snap, 'snap1')

        # create a new pool...
        pool_name2 = get_temp_pool_name()
        rados.create_pool(pool_name2)
        other_ioctx = rados.open_ioctx(pool_name2)

        # ...with a clone of the same parent
        other_clone_name = get_temp_image_name()
        self.rbd.clone(ioctx, image_name, 'snap1', other_ioctx,
                       other_clone_name, features)
        self.other_clone = Image(other_ioctx, other_clone_name)
        # validate its parent info
        (pool, image, snap) = self.other_clone.parent_info()
        eq(pool, pool_name)
        eq(image, image_name)
        eq(snap, 'snap1')

        # can't unprotect snap with children
        assert_raises(ImageBusy, self.image.unprotect_snap, 'snap1')

        # 2 children, check that cannot remove the parent snap
        assert_raises(ImageBusy, self.image.remove_snap, 'snap1')

        # close and remove other pool's clone
        self.other_clone.close()
        self.rbd.remove(other_ioctx, other_clone_name)

        # check that we cannot yet remove the parent snap
        assert_raises(ImageBusy, self.image.remove_snap, 'snap1')

        other_ioctx.close()
        rados.delete_pool(pool_name2)

        # unprotect, remove parent snap happen in cleanup, and should succeed

    def test_stat(self):
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info['size'], image_info['size'])
        eq(clone_info['size'], self.clone.overlap())

    def test_resize_stat(self):
        self.clone.resize(IMG_SIZE / 2)
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info['size'], IMG_SIZE / 2)
        eq(image_info['size'], IMG_SIZE)
        eq(self.clone.overlap(), IMG_SIZE / 2)

        self.clone.resize(IMG_SIZE * 2)
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info['size'], IMG_SIZE * 2)
        eq(image_info['size'], IMG_SIZE)
#.........这里部分代码省略.........
开发者ID:OyTao,项目名称:ceph-learning,代码行数:101,代码来源:test_rbd.py


示例6: setUp

 def setUp(self):
     self.rbd = RBD()
     create_image()
     self.image = Image(ioctx, image_name)
开发者ID:OyTao,项目名称:ceph-learning,代码行数:4,代码来源:test_rbd.py


示例7: TestImage

class TestImage(object):

    def setUp(self):
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, image_name)

    def tearDown(self):
        self.image.close()
        remove_image()

    @require_new_format()
    @blacklist_features([RBD_FEATURE_EXCLUSIVE_LOCK])
    def test_update_features(self):
        features = self.image.features()
        self.image.update_features(RBD_FEATURE_EXCLUSIVE_LOCK, True)
        eq(features | RBD_FEATURE_EXCLUSIVE_LOCK, self.image.features())

    def test_invalidate_cache(self):
        self.image.write('abc', 0)
        eq('abc', self.image.read(0, 3))
        self.image.invalidate_cache()
        eq('abc', self.image.read(0, 3))

    def test_stat(self):
        info = self.image.stat()
        check_stat(info, IMG_SIZE, IMG_ORDER)

    def test_flags(self):
        flags = self.image.flags()
        eq(0, flags)

    def test_write(self):
        data = rand_data(256)
        self.image.write(data, 0)

    def test_write_with_fadvise_flags(self):
        data = rand_data(256)
        self.image.write(data, 0, LIBRADOS_OP_FLAG_FADVISE_DONTNEED)
        self.image.write(data, 0, LIBRADOS_OP_FLAG_FADVISE_NOCACHE)

    def test_read(self):
        data = self.image.read(0, 20)
        eq(data, '\0' * 20)

    def test_read_with_fadvise_flags(self):
        data = self.image.read(0, 20, LIBRADOS_OP_FLAG_FADVISE_DONTNEED)
        eq(data, '\0' * 20)
        data = self.image.read(0, 20, LIBRADOS_OP_FLAG_FADVISE_RANDOM)
        eq(data, '\0' * 20)

    def test_large_write(self):
        data = rand_data(IMG_SIZE)
        self.image.write(data, 0)

    def test_large_read(self):
        data = self.image.read(0, IMG_SIZE)
        eq(data, '\0' * IMG_SIZE)

    def test_write_read(self):
        data = rand_data(256)
        offset = 50
        self.image.write(data, offset)
        read = self.image.read(offset, 256)
        eq(data, read)

    def test_read_bad_offset(self):
        assert_raises(InvalidArgument, self.image.read, IMG_SIZE + 1, IMG_SIZE)

    def test_resize(self):
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        info = self.image.stat()
        check_stat(info, new_size, IMG_ORDER)

    def test_size(self):
        eq(IMG_SIZE, self.image.size())
        self.image.create_snap('snap1')
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        eq(new_size, self.image.size())
        self.image.create_snap('snap2')
        self.image.set_snap('snap2')
        eq(new_size, self.image.size())
        self.image.set_snap('snap1')
        eq(IMG_SIZE, self.image.size())
        self.image.set_snap(None)
        eq(new_size, self.image.size())
        self.image.remove_snap('snap1')
        self.image.remove_snap('snap2')

    def test_resize_down(self):
        new_size = IMG_SIZE / 2
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2);
        self.image.resize(new_size)
        self.image.resize(IMG_SIZE)
        read = self.image.read(IMG_SIZE / 2, 256)
        eq('\0' * 256, read)

#.........这里部分代码省略.........
开发者ID:OyTao,项目名称:ceph-learning,代码行数:101,代码来源:test_rbd.py


示例8: TestImage

class TestImage(object):

    def setUp(self):
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, IMG_NAME)

    def tearDown(self):
        self.image.close()
        remove_image()

    def test_stat(self):
        info = self.image.stat()
        check_stat(info, IMG_SIZE, IMG_ORDER)

    def test_write(self):
        data = rand_data(256)
        self.image.write(data, 0)

    def test_read(self):
        data = self.image.read(0, 20)
        eq(data, '\0' * 20)

    def test_large_write(self):
        data = rand_data(IMG_SIZE)
        self.image.write(data, 0)

    def test_large_read(self):
        data = self.image.read(0, IMG_SIZE)
        eq(data, '\0' * IMG_SIZE)

    def test_write_read(self):
        data = rand_data(256)
        offset = 50
        self.image.write(data, offset)
        read = self.image.read(offset, 256)
        eq(data, read)

    def test_read_bad_offset(self):
        assert_raises(InvalidArgument, self.image.read, IMG_SIZE + 1, IMG_SIZE)

    def test_resize(self):
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        info = self.image.stat()
        check_stat(info, new_size, IMG_ORDER)

    def test_resize_down(self):
        new_size = IMG_SIZE / 2
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2);
        self.image.resize(new_size)
        self.image.resize(IMG_SIZE)
        read = self.image.read(IMG_SIZE / 2, 256)
        eq('\0' * 256, read)

    def test_resize_bytes(self):
        new_size = IMG_SIZE / 2 - 5
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2 - 10);
        self.image.resize(new_size)
        self.image.resize(IMG_SIZE)
        read = self.image.read(IMG_SIZE / 2 - 10, 5)
        eq(data[:5], read)
        read = self.image.read(IMG_SIZE / 2 - 5, 251)
        eq('\0' * 251, read)

    def test_copy(self):
        global ioctx
        data = rand_data(256)
        self.image.write(data, 256)
        self.image.copy(ioctx, IMG_NAME + '2')
        assert_raises(ImageExists, self.image.copy, ioctx, IMG_NAME + '2')
        copy = Image(ioctx, IMG_NAME + '2')
        copy_data = copy.read(256, 256)
        copy.close()
        self.rbd.remove(ioctx, IMG_NAME + '2')
        eq(data, copy_data)

    def test_create_snap(self):
        global ioctx
        self.image.create_snap('snap1')
        read = self.image.read(0, 256)
        eq(read, '\0' * 256)
        data = rand_data(256)
        self.image.write(data, 0)
        read = self.image.read(0, 256)
        eq(read, data)
        at_snapshot = Image(ioctx, IMG_NAME, 'snap1')
        snap_data = at_snapshot.read(0, 256)
        at_snapshot.close()
        eq(snap_data, '\0' * 256)
        self.image.remove_snap('snap1')

    def test_list_snaps(self):
        eq([], list(self.image.list_snaps()))
        self.image.create_snap('snap1')
        eq(['snap1'], map(lambda snap: snap['name'], self.image.list_snaps()))
        self.image.create_snap('snap2')
        eq(['snap1', 'snap2'], map(lambda snap: snap['name'], self.image.list_snaps()))
#.........这里部分代码省略.........
开发者ID:kritivasas,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例9: TestClone

class TestClone(object):
    @require_features([RBD_FEATURE_LAYERING])
    def setUp(self):
        global ioctx
        global features
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, IMG_NAME)
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2)
        self.image.create_snap("snap1")
        global features
        self.image.protect_snap("snap1")
        self.rbd.clone(ioctx, IMG_NAME, "snap1", ioctx, "clone", features)
        self.clone = Image(ioctx, "clone")

    def tearDown(self):
        global ioctx
        self.clone.close()
        self.rbd.remove(ioctx, "clone")
        self.image.unprotect_snap("snap1")
        self.image.remove_snap("snap1")
        self.image.close()
        remove_image()

    def test_unprotected(self):
        self.image.create_snap("snap2")
        global features
        assert_raises(InvalidArgument, self.rbd.clone, ioctx, IMG_NAME, "snap2", ioctx, "clone2", features)
        self.image.remove_snap("snap2")

    def test_unprotect_with_children(self):
        global features
        # can't remove a snapshot that has dependent clones
        assert_raises(ImageBusy, self.image.remove_snap, "snap1")

        # validate parent info of clone created by TestClone.setUp
        (pool, image, snap) = self.clone.parent_info()
        eq(pool, "rbd")
        eq(image, IMG_NAME)
        eq(snap, "snap1")

        # create a new pool...
        rados.create_pool("rbd2")
        other_ioctx = rados.open_ioctx("rbd2")

        # ...with a clone of the same parent
        self.rbd.clone(ioctx, IMG_NAME, "snap1", other_ioctx, "other_clone", features)
        self.other_clone = Image(other_ioctx, "other_clone")
        # validate its parent info
        (pool, image, snap) = self.other_clone.parent_info()
        eq(pool, "rbd")
        eq(image, IMG_NAME)
        eq(snap, "snap1")

        # can't unprotect snap with children
        assert_raises(ImageBusy, self.image.unprotect_snap, "snap1")

        # 2 children, check that cannot remove the parent snap
        assert_raises(ImageBusy, self.image.remove_snap, "snap1")

        # close and remove other pool's clone
        self.other_clone.close()
        self.rbd.remove(other_ioctx, "other_clone")

        # check that we cannot yet remove the parent snap
        assert_raises(ImageBusy, self.image.remove_snap, "snap1")

        other_ioctx.close()
        rados.delete_pool("rbd2")

        # unprotect, remove parent snap happen in cleanup, and should succeed

    def test_stat(self):
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info["size"], image_info["size"])
        eq(clone_info["size"], self.clone.overlap())

    def test_resize_stat(self):
        self.clone.resize(IMG_SIZE / 2)
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info["size"], IMG_SIZE / 2)
        eq(image_info["size"], IMG_SIZE)
        eq(self.clone.overlap(), IMG_SIZE / 2)

        self.clone.resize(IMG_SIZE * 2)
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info["size"], IMG_SIZE * 2)
        eq(image_info["size"], IMG_SIZE)
        eq(self.clone.overlap(), IMG_SIZE / 2)

    def test_resize_io(self):
        parent_data = self.image.read(IMG_SIZE / 2, 256)
        self.clone.resize(IMG_SIZE / 2 + 128)
        child_data = self.clone.read(IMG_SIZE / 2, 128)
        eq(child_data, parent_data[:128])
        self.clone.resize(IMG_SIZE)
#.........这里部分代码省略.........
开发者ID:AlphaStaxLLC,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例10: TestImage

class TestImage(object):
    def setUp(self):
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, IMG_NAME)

    def tearDown(self):
        self.image.close()
        remove_image()

    def test_stat(self):
        info = self.image.stat()
        check_stat(info, IMG_SIZE, IMG_ORDER)

    def test_write(self):
        data = rand_data(256)
        self.image.write(data, 0)

    def test_read(self):
        data = self.image.read(0, 20)
        eq(data, "\0" * 20)

    def test_large_write(self):
        data = rand_data(IMG_SIZE)
        self.image.write(data, 0)

    def test_large_read(self):
        data = self.image.read(0, IMG_SIZE)
        eq(data, "\0" * IMG_SIZE)

    def test_write_read(self):
        data = rand_data(256)
        offset = 50
        self.image.write(data, offset)
        read = self.image.read(offset, 256)
        eq(data, read)

    def test_read_bad_offset(self):
        assert_raises(InvalidArgument, self.image.read, IMG_SIZE + 1, IMG_SIZE)

    def test_resize(self):
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        info = self.image.stat()
        check_stat(info, new_size, IMG_ORDER)

    def test_size(self):
        eq(IMG_SIZE, self.image.size())
        self.image.create_snap("snap1")
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        eq(new_size, self.image.size())
        self.image.create_snap("snap2")
        self.image.set_snap("snap2")
        eq(new_size, self.image.size())
        self.image.set_snap("snap1")
        eq(IMG_SIZE, self.image.size())
        self.image.set_snap(None)
        eq(new_size, self.image.size())
        self.image.remove_snap("snap1")
        self.image.remove_snap("snap2")

    def test_resize_down(self):
        new_size = IMG_SIZE / 2
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2)
        self.image.resize(new_size)
        self.image.resize(IMG_SIZE)
        read = self.image.read(IMG_SIZE / 2, 256)
        eq("\0" * 256, read)

    def test_resize_bytes(self):
        new_size = IMG_SIZE / 2 - 5
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2 - 10)
        self.image.resize(new_size)
        self.image.resize(IMG_SIZE)
        read = self.image.read(IMG_SIZE / 2 - 10, 5)
        eq(data[:5], read)
        read = self.image.read(IMG_SIZE / 2 - 5, 251)
        eq("\0" * 251, read)

    def test_copy(self):
        global ioctx
        data = rand_data(256)
        self.image.write(data, 256)
        self.image.copy(ioctx, IMG_NAME + "2")
        assert_raises(ImageExists, self.image.copy, ioctx, IMG_NAME + "2")
        copy = Image(ioctx, IMG_NAME + "2")
        copy_data = copy.read(256, 256)
        copy.close()
        self.rbd.remove(ioctx, IMG_NAME + "2")
        eq(data, copy_data)

    def test_create_snap(self):
        global ioctx
        self.image.create_snap("snap1")
        read = self.image.read(0, 256)
        eq(read, "\0" * 256)
        data = rand_data(256)
#.........这里部分代码省略.........
开发者ID:AlphaStaxLLC,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例11: TestImage

class TestImage(object):

    def setUp(self):
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, image_name)

    def tearDown(self):
        self.image.close()
        remove_image()
        self.image = None

    @require_new_format()
    @blacklist_features([RBD_FEATURE_EXCLUSIVE_LOCK])
    def test_update_features(self):
        features = self.image.features()
        self.image.update_features(RBD_FEATURE_EXCLUSIVE_LOCK, True)
        eq(features | RBD_FEATURE_EXCLUSIVE_LOCK, self.image.features())

    @require_features([RBD_FEATURE_STRIPINGV2])
    def test_create_with_params(self):
        global features
        image_name = get_temp_image_name()
        order = 20
        stripe_unit = 1 << 20
        stripe_count = 10
        self.rbd.create(ioctx, image_name, IMG_SIZE, order,
                        False, features, stripe_unit, stripe_count)
        image = Image(ioctx, image_name)
        info = image.stat()
        check_stat(info, IMG_SIZE, order)
        eq(image.features(), features)
        eq(image.stripe_unit(), stripe_unit)
        eq(image.stripe_count(), stripe_count)
        image.close()
        RBD().remove(ioctx, image_name)

    @require_new_format()
    def test_id(self):
        assert_not_equal(b'', self.image.id())

    def test_block_name_prefix(self):
        assert_not_equal(b'', self.image.block_name_prefix())

    def test_invalidate_cache(self):
        self.image.write(b'abc', 0)
        eq(b'abc', self.image.read(0, 3))
        self.image.invalidate_cache()
        eq(b'abc', self.image.read(0, 3))

    def test_stat(self):
        info = self.image.stat()
        check_stat(info, IMG_SIZE, IMG_ORDER)

    def test_flags(self):
        flags = self.image.flags()
        eq(0, flags)

    def test_image_auto_close(self):
        image = Image(ioctx, image_name)

    def test_write(self):
        data = rand_data(256)
        self.image.write(data, 0)

    def test_write_with_fadvise_flags(self):
        data = rand_data(256)
        self.image.write(data, 0, LIBRADOS_OP_FLAG_FADVISE_DONTNEED)
        self.image.write(data, 0, LIBRADOS_OP_FLAG_FADVISE_NOCACHE)

    def test_read(self):
        data = self.image.read(0, 20)
        eq(data, b'\0' * 20)

    def test_read_with_fadvise_flags(self):
        data = self.image.read(0, 20, LIBRADOS_OP_FLAG_FADVISE_DONTNEED)
        eq(data, b'\0' * 20)
        data = self.image.read(0, 20, LIBRADOS_OP_FLAG_FADVISE_RANDOM)
        eq(data, b'\0' * 20)

    def test_large_write(self):
        data = rand_data(IMG_SIZE)
        self.image.write(data, 0)

    def test_large_read(self):
        data = self.image.read(0, IMG_SIZE)
        eq(data, b'\0' * IMG_SIZE)

    def test_write_read(self):
        data = rand_data(256)
        offset = 50
        self.image.write(data, offset)
        read = self.image.read(offset, 256)
        eq(data, read)

    def test_read_bad_offset(self):
        assert_raises(InvalidArgument, self.image.read, IMG_SIZE + 1, IMG_SIZE)

    def test_resize(self):
        new_size = IMG_SIZE * 2
#.........这里部分代码省略.........
开发者ID:Intel-bigdata,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例12: setUp

 def setUp(self):
     self.rbd = RBD()
     self.initial_mirror_mode = self.rbd.mirror_mode_get(ioctx)
     self.rbd.mirror_mode_set(ioctx, RBD_MIRROR_MODE_POOL)
     create_image()
     self.image = Image(ioctx, image_name)
开发者ID:Intel-bigdata,项目名称:ceph,代码行数:6,代码来源:test_rbd.py


示例13: TestMirroring

class TestMirroring(object):

    @staticmethod
    def check_info(info, global_id, state, primary=None):
        eq(global_id, info['global_id'])
        eq(state, info['state'])
        if primary is not None:
            eq(primary, info['primary'])

    def setUp(self):
        self.rbd = RBD()
        self.initial_mirror_mode = self.rbd.mirror_mode_get(ioctx)
        self.rbd.mirror_mode_set(ioctx, RBD_MIRROR_MODE_POOL)
        create_image()
        self.image = Image(ioctx, image_name)

    def tearDown(self):
        self.image.close()
        remove_image()
        self.rbd.mirror_mode_set(ioctx, self.initial_mirror_mode)


    def test_mirror_peer(self):
        eq([], list(self.rbd.mirror_peer_list(ioctx)))
        cluster_name = "test_cluster"
        client_name = "test_client"
        uuid = self.rbd.mirror_peer_add(ioctx, cluster_name, client_name)
        assert(uuid)
        peer = {
            'uuid' : uuid,
            'cluster_name' : cluster_name,
            'client_name' : client_name,
            }
        eq([peer], list(self.rbd.mirror_peer_list(ioctx)))
        cluster_name = "test_cluster1"
        self.rbd.mirror_peer_set_cluster(ioctx, uuid, cluster_name)
        client_name = "test_client1"
        self.rbd.mirror_peer_set_client(ioctx, uuid, client_name)
        peer = {
            'uuid' : uuid,
            'cluster_name' : cluster_name,
            'client_name' : client_name,
            }
        eq([peer], list(self.rbd.mirror_peer_list(ioctx)))
        self.rbd.mirror_peer_remove(ioctx, uuid)
        eq([], list(self.rbd.mirror_peer_list(ioctx)))

    @require_features([RBD_FEATURE_EXCLUSIVE_LOCK,
                       RBD_FEATURE_JOURNALING])
    def test_mirror_image(self):

        self.rbd.mirror_mode_set(ioctx, RBD_MIRROR_MODE_IMAGE)
        self.image.mirror_image_disable(True)
        info = self.image.mirror_image_get_info()
        self.check_info(info, '', RBD_MIRROR_IMAGE_DISABLED, False)

        self.image.mirror_image_enable()
        info = self.image.mirror_image_get_info()
        global_id = info['global_id']
        self.check_info(info, global_id, RBD_MIRROR_IMAGE_ENABLED, True)

        self.rbd.mirror_mode_set(ioctx, RBD_MIRROR_MODE_POOL)
        fail = False
        try:
            self.image.mirror_image_disable(True)
        except InvalidArgument:
            fail = True
        eq(True, fail) # Fails because of mirror mode pool

        self.image.mirror_image_demote()
        info = self.image.mirror_image_get_info()
        self.check_info(info, global_id, RBD_MIRROR_IMAGE_ENABLED, False)

        self.image.mirror_image_resync()

        self.image.mirror_image_promote(True)
        info = self.image.mirror_image_get_info()
        self.check_info(info, global_id, RBD_MIRROR_IMAGE_ENABLED, True)

        fail = False
        try:
            self.image.mirror_image_resync()
        except InvalidArgument:
            fail = True
        eq(True, fail) # Fails because it is primary

        status = self.image.mirror_image_get_status()
        eq(image_name, status['name'])
        eq(False, status['up'])
        eq(MIRROR_IMAGE_STATUS_STATE_UNKNOWN, status['state'])
        info = status['info']
        self.check_info(info, global_id, RBD_MIRROR_IMAGE_ENABLED, True)

    @require_features([RBD_FEATURE_EXCLUSIVE_LOCK,
                       RBD_FEATURE_JOURNALING])
    def test_mirror_image_status(self):
        info = self.image.mirror_image_get_info()
        global_id = info['global_id']
        state = info['state']
        primary = info['primary']
#.........这里部分代码省略.........
开发者ID:Intel-bigdata,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例14: TestImage

class TestImage(object):
    def setUp(self):
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, image_name)

    def tearDown(self):
        self.image.close()
        remove_image()

    @require_new_format()
    @blacklist_features([RBD_FEATURE_EXCLUSIVE_LOCK])
    def test_update_features(self):
        features = self.image.features()
        self.image.update_features(RBD_FEATURE_EXCLUSIVE_LOCK, True)
        eq(features | RBD_FEATURE_EXCLUSIVE_LOCK, self.image.features())

    @require_features([RBD_FEATURE_STRIPINGV2])
    def test_create_with_params(self):
        global features
        image_name = get_temp_image_name()
        order = 20
        stripe_unit = 1 << 20
        stripe_count = 10
        self.rbd.create(ioctx, image_name, IMG_SIZE, order, False, features, stripe_unit, stripe_count)
        image = Image(ioctx, image_name)
        info = image.stat()
        check_stat(info, IMG_SIZE, order)
        eq(image.features(), features)
        eq(image.stripe_unit(), stripe_unit)
        eq(image.stripe_count(), stripe_count)
        image.close()
        RBD().remove(ioctx, image_name)

    def test_invalidate_cache(self):
        self.image.write(b"abc", 0)
        eq(b"abc", self.image.read(0, 3))
        self.image.invalidate_cache()
        eq(b"abc", self.image.read(0, 3))

    def test_stat(self):
        info = self.image.stat()
        check_stat(info, IMG_SIZE, IMG_ORDER)

    def test_flags(self):
        flags = self.image.flags()
        eq(0, flags)

    def test_image_auto_close(self):
        image = Image(ioctx, image_name)

    def test_write(self):
        data = rand_data(256)
        self.image.write(data, 0)

    def test_write_with_fadvise_flags(self):
        data = rand_data(256)
        self.image.write(data, 0, LIBRADOS_OP_FLAG_FADVISE_DONTNEED)
        self.image.write(data, 0, LIBRADOS_OP_FLAG_FADVISE_NOCACHE)

    def test_read(self):
        data = self.image.read(0, 20)
        eq(data, b"\0" * 20)

    def test_read_with_fadvise_flags(self):
        data = self.image.read(0, 20, LIBRADOS_OP_FLAG_FADVISE_DONTNEED)
        eq(data, b"\0" * 20)
        data = self.image.read(0, 20, LIBRADOS_OP_FLAG_FADVISE_RANDOM)
        eq(data, b"\0" * 20)

    def test_large_write(self):
        data = rand_data(IMG_SIZE)
        self.image.write(data, 0)

    def test_large_read(self):
        data = self.image.read(0, IMG_SIZE)
        eq(data, b"\0" * IMG_SIZE)

    def test_write_read(self):
        data = rand_data(256)
        offset = 50
        self.image.write(data, offset)
        read = self.image.read(offset, 256)
        eq(data, read)

    def test_read_bad_offset(self):
        assert_raises(InvalidArgument, self.image.read, IMG_SIZE + 1, IMG_SIZE)

    def test_resize(self):
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        info = self.image.stat()
        check_stat(info, new_size, IMG_ORDER)

    def test_size(self):
        eq(IMG_SIZE, self.image.size())
        self.image.create_snap("snap1")
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        eq(new_size, self.image.size())
#.........这里部分代码省略.........
开发者ID:carmark,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例15: test_rename

def test_rename():
    rbd = RBD()
    rbd.rename(ioctx, IMG_NAME, IMG_NAME + '2')
    eq([IMG_NAME + '2'], rbd.list(ioctx))
    rbd.rename(ioctx, IMG_NAME + '2', IMG_NAME)
    eq([IMG_NAME], rbd.list(ioctx))
开发者ID:ctrlaltdel,项目名称:ceph,代码行数:6,代码来源:test_rbd.py


示例16: TestImage

class TestImage(object):

    def setUp(self):
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, IMG_NAME)

    def tearDown(self):
        self.image.close()
        remove_image()

    def test_stat(self):
        info = self.image.stat()
        check_stat(info, IMG_SIZE, IMG_ORDER)

    def test_write(self):
        data = rand_data(256)
        self.image.write(data, 0)

    def test_read(self):
        data = self.image.read(0, 20)
        eq(data, '\0' * 20)

    def test_large_write(self):
        data = rand_data(IMG_SIZE)
        self.image.write(data, 0)

    def test_large_read(self):
        data = self.image.read(0, IMG_SIZE)
        eq(data, '\0' * IMG_SIZE)

    def test_write_read(self):
        data = rand_data(256)
        offset = 50
        self.image.write(data, offset)
        read = self.image.read(offset, 256)
        eq(data, read)

    def test_read_bad_offset(self):
        assert_raises(InvalidArgument, self.image.read, IMG_SIZE + 1, IMG_SIZE)

    def test_resize(self):
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        info = self.image.stat()
        check_stat(info, new_size, IMG_ORDER)

    def test_size(self):
        eq(IMG_SIZE, self.image.size())
        self.image.create_snap('snap1')
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        eq(new_size, self.image.size())
        self.image.create_snap('snap2')
        self.image.set_snap('snap2')
        eq(new_size, self.image.size())
        self.image.set_snap('snap1')
        eq(IMG_SIZE, self.image.size())
        self.image.set_snap(None)
        eq(new_size, self.image.size())
        self.image.remove_snap('snap1')
        self.image.remove_snap('snap2')

    def test_resize_down(self):
        new_size = IMG_SIZE / 2
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2);
        self.image.resize(new_size)
        self.image.resize(IMG_SIZE)
        read = self.image.read(IMG_SIZE / 2, 256)
        eq('\0' * 256, read)

    def test_resize_bytes(self):
        new_size = IMG_SIZE / 2 - 5
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2 - 10);
        self.image.resize(new_size)
        self.image.resize(IMG_SIZE)
        read = self.image.read(IMG_SIZE / 2 - 10, 5)
        eq(data[:5], read)
        read = self.image.read(IMG_SIZE / 2 - 5, 251)
        eq('\0' * 251, read)

    def test_copy(self):
        global ioctx
        data = rand_data(256)
        self.image.write(data, 256)
        self.image.copy(ioctx, IMG_NAME + '2')
        assert_raises(ImageExists, self.image.copy, ioctx, IMG_NAME + '2')
        copy = Image(ioctx, IMG_NAME + '2')
        copy_data = copy.read(256, 256)
        copy.close()
        self.rbd.remove(ioctx, IMG_NAME + '2')
        eq(data, copy_data)

    def test_create_snap(self):
        global ioctx
        self.image.create_snap('snap1')
        read = self.image.read(0, 256)
        eq(read, '\0' * 256)
#.........这里部分代码省略.........
开发者ID:HubPeter,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例17: TestClone

class TestClone(object):

    @require_features([RBD_FEATURE_LAYERING])
    def setUp(self):
        global ioctx
        global features
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, IMG_NAME)
        data = rand_data(256)
        self.image.write(data, IMG_SIZE / 2)
        self.image.create_snap('snap1')
        global features
        self.image.protect_snap('snap1')
        self.rbd.clone(ioctx, IMG_NAME, 'snap1', ioctx, 'clone', features)
        self.clone = Image(ioctx, 'clone')

    def tearDown(self):
        global ioctx
        self.clone.close()
        self.rbd.remove(ioctx, 'clone')
        self.image.unprotect_snap('snap1')
        self.image.remove_snap('snap1')
        self.image.close()
        remove_image()

    def test_unprotected(self):
        self.image.create_snap('snap2')
        global features
        assert_raises(InvalidArgument, self.rbd.clone, ioctx, IMG_NAME, 'snap2', ioctx, 'clone2', features)
        self.image.remove_snap('snap2')

    def test_unprotect_with_children(self):
        global features
        # can't remove a snapshot that has dependent clones
        assert_raises(ImageBusy, self.image.remove_snap, 'snap1')

        # validate parent info of clone created by TestClone.setUp
        (pool, image, snap) = self.clone.parent_info()
        eq(pool, 'rbd')
        eq(image, IMG_NAME)
        eq(snap, 'snap1')

        # create a new pool...
        rados.create_pool('rbd2')
        other_ioctx = rados.open_ioctx('rbd2')

        # ...with a clone of the same parent
        self.rbd.clone(ioctx, IMG_NAME, 'snap1', other_ioctx, 'other_clone', features)
        self.other_clone = Image(other_ioctx, 'other_clone')
        # validate its parent info
        (pool, image, snap) = self.other_clone.parent_info()
        eq(pool, 'rbd')
        eq(image, IMG_NAME)
        eq(snap, 'snap1')

        # can't unprotect snap with children
        assert_raises(ImageBusy, self.image.unprotect_snap, 'snap1')

        # 2 children, check that cannot remove the parent snap
        assert_raises(ImageBusy, self.image.remove_snap, 'snap1')

        # close and remove other pool's clone
        self.other_clone.close()
        self.rbd.remove(other_ioctx, 'other_clone')

        # check that we cannot yet remove the parent snap
        assert_raises(ImageBusy, self.image.remove_snap, 'snap1')

        other_ioctx.close()
        rados.delete_pool('rbd2')

        # unprotect, remove parent snap happen in cleanup, and should succeed

    def test_stat(self):
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info['size'], image_info['size'])
        eq(clone_info['size'], self.clone.overlap())

    def test_resize_stat(self):
        self.clone.resize(IMG_SIZE / 2)
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info['size'], IMG_SIZE / 2)
        eq(image_info['size'], IMG_SIZE)
        eq(self.clone.overlap(), IMG_SIZE / 2)

        self.clone.resize(IMG_SIZE * 2)
        image_info = self.image.stat()
        clone_info = self.clone.stat()
        eq(clone_info['size'], IMG_SIZE * 2)
        eq(image_info['size'], IMG_SIZE)
        eq(self.clone.overlap(), IMG_SIZE / 2)

    def test_resize_io(self):
        parent_data = self.image.read(IMG_SIZE / 2, 256)
        self.clone.resize(IMG_SIZE / 2 + 128)
        child_data = self.clone.read(IMG_SIZE / 2, 128)
        eq(child_data, parent_data[:128])
#.........这里部分代码省略.........
开发者ID:HubPeter,项目名称:ceph,代码行数:101,代码来源:test_rbd.py


示例18: TestImage

class TestImage(object):

    def setUp(self):
        self.rbd = RBD()
        create_image()
        self.image = Image(ioctx, IMG_NAME)

    def tearDown(self):
        self.image.close()
        remove_image()

    def test_stat(self):
        info = self.image.stat()
        check_stat(info, IMG_SIZE, IMG_ORDER)

    def test_write(self):
        data = rand_data(256)
        self.image.write(data, 0)

    def test_read(self):
        data = self.image.read(0, 20)
        eq(data, '\0' * 20)

    def test_large_write(self):
        data = rand_data(IMG_SIZE)
        self.image.write(data, 0)

    def test_large_read(self):
        data = self.image.read(0, IMG_SIZE)
        eq(data, '\0' * IMG_SIZE)

    def test_write_read(self):
        data = rand_data(256)
        offset = 50
        self.image.write(data, offset)
        read = self.image.read(offset, 256)
        eq(data, read)

    def test_read_bad_offset(self):
        assert_raises(InvalidArgument, self.image.read, IMG_SIZE + 1, IMG_SIZE)

    def test_resize(self):
        new_size = IMG_SIZE * 2
        self.image.resize(new_size)
        info = self.image.stat()
        check_stat(info, new_size, IMG_ORDER)

    def test_copy(self):
        global ioctx
        data = rand_data(256)
        self.image.write(data, 256)
        bytes_copied = self.image.copy(ioctx, IMG_NAME + '2')
        copy = Image(ioctx, IMG_NAME + '2')
        copy_data = copy.read(256, 256)
        copy.close()
        self.rbd.remove(ioctx, IMG_NAME + '2')
        eq(bytes_copied, IMG_SIZE)
        eq(data, copy_data)

    def test_create_snap(self):
        global ioctx
        self.image.create_snap('snap1')
        read = self.image.read(0, 256)
       

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python rbm.RBM类代码示例发布时间:2022-05-26
下一篇:
Python rbd.Image类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap