OpenStack_Swift源代码分析——Ring的rebalance算法源代码具体分析

1 Command类中的rebalnace方法


swift-ring-builder object.builder rebalance

首先会调用swift/cli/ringbuilder.py中方法,在main方法中首先会判读/etc/swift目录下是否有object.builder文件假设有就反序列化来初始化RingBuilder类。然后依据命令中的 第三个參数rebalance调用Commands类中的rebalance方法。此方法会对加入的设备进行平衡并为replica2part2dev(备份到分区到设备的映射)赋值。以下看代码的详细实现:

def rebalance(self):
swift-ring-builder <builder_file> rebalance <seed>
    Attempts to rebalance the ring by reassigning partitions that haven\'t been
    recently reassigned.
        def get_seed(index):
                return argv[index]
            except IndexError:

        devs_changed = builder.devs_changed
            last_balance = builder.get_balance()
            parts, balance = builder.rebalance(seed=get_seed(3))         #builder进行平衡
        except exceptions.RingBuilderError as e:
            print \'-\' * 79
            print("An error has occurred during ring validation. Common\n"
                  "causes of failure are rings that are empty or do not\n"
                  "have enough devices to accommodate the replica count.\n"
                  "Original exception message:\n %s" % e.message
            print \'-\' * 79
        if not parts:
            print \'No partitions could be reassigned.\'
            print \'Either none need to be or none can be due to \' \
                  \'min_part_hours [%s].\' % builder.min_part_hours
        # If we set device\'s weight to zero, currently balance will be set
        # special value(MAX_BALANCE) until zero weighted device return all
        # its partitions. So we cannot check balance has changed.
        # Thus we need to check balance or last_balance is special value.
        if not devs_changed and abs(last_balance - balance) < 1 and \
                not (last_balance == MAX_BALANCE and balance == MAX_BALANCE):
            print \'Cowardly  refusing to save rebalance as it did not change \' \
                  \'at least 1%.\'
        except exceptions.RingValidationError as e:
            print \'-\' * 79
            print("An error has occurred during ring validation. Common\n"
                  "causes of failure are rings that are empty or do not\n"
                  "have enough devices to accommodate the replica count.\n"
                  "Original exception message:\n %s" % e.message
            print \'-\' * 79
        print \'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.\' % \
              (parts, 100.0 * parts / builder.parts, balance)
        status = EXIT_SUCCESS
        if balance > 5:
            print \'-\' * 79
            print \'NOTE: Balance of %.02f indicates you should push this \' % \
            print \'      ring, wait at least %d hours, and rebalance/repush.\' \
                  % builder.min_part_hours
            print \'-\' * 79
            status = EXIT_WARNING
        ts = time()
            pathjoin(backup_dir, \'%d.\' % ts + basename(ring_file)))
        builder.save(pathjoin(backup_dir, \'%d.\' % ts + basename(argv[1])))

def rebalance(self, seed=None):
        Rebalance the ring.
        又一次平衡 ring
        This is the main work function of the builder, as it will assign and
        reassign partitions to devices in the ring based on weights, distinct
        zones, recent reassignments,(依据他的权重、不同的zone、近期的分配) etc.

        The process doesn\'t always perfectly assign partitions (that\'d take a
        lot more analysis and therefore a lot more time -- I had code that did
        that before). Because of this, it keeps rebalancing until the device
        skew 歪斜 (number of partitions a device wants compared to what it has) gets
        below 1% or doesn\'t change by more than 1% (only happens with ring that
        can\'t be balanced no matter what -- like with 3 zones of differing
        weights with replicas set to 3).
        #低于 1%时 或者变化没有多有 1% 不用再平衡会又一次平衡
        :returns: (number_of_partitions_altered, resulting_balance)

        if seed is not None:

        self._ring = None
        if self._last_part_moves_epoch is None:
            self._initial_balance()        #第一次平衡环 须要初始化操作
            self.devs_changed = False
            return self.parts, self.get_balance()
        retval = 0
        last_balance = 0
        new_parts, removed_part_count = self._adjust_replica2part2dev_size()
        retval += removed_part_count
        retval += len(new_parts)
        while True:
            reassign_parts = self._gather_reassign_parts()
            retval += len(reassign_parts)
            while self._remove_devs:
                self.devs[self._remove_devs.pop()[\'id\']] = None
            balance = self.get_balance()
            if balance < 1 or abs(last_balance - balance) < 1 or \
                    retval == self.parts:
            last_balance = balance
        self.devs_changed = False
        self.version += 1
        return retval, balance

 def _initial_balance(self):
        Initial partition assignment is the same as rebalancing an
        existing ring, but with some initial setup beforehand(须要事先设定).
        self._last_part_moves = array(\'B\', (0 for _junk in xrange(self.parts)))   #记录每个分区的变动时间
        self._last_part_moves_epoch = int(time())                                 #上一次分区变动的时间偏移




 def _reassign_parts(self, reassign_parts):
    [(0, [0, 1, 2]), (1, [0, 1, 2]), (2, [0, 1, 2]), (3, [0, 1, 2]), (4, [0, 1, 2]), (5, [0, 1, 2]), (6, [0, 1, 2]), (7, [0, 1, 2]), (8, [0, 1, 2]), (9, [0, 1, 2]), (10, [0, 1, 2]), (11, [0, 1, 2]), (12, [0, 1, 2]), (13, [0, 1, 2]), (14, [0, 1, 2]), (15, [0, 1, 2]), (16, [0, 1, 2]), (17, [0, 1, 2]), (18, [0, 1, 2]), (19, [0, 1, 2]), (20, [0, 1, 2]), (21, [0, 1, 2]), (22, [0, 1, 2]), (23, [0, 1, 2]), (24, [0, 1, 2]), (25, [0, 1, 2]), (26, [0, 1, 2]), (27, [0, 1, 2]), (28, [0, 1, 2]), (29, [0, 1, 2]), (30, [0, 1, 2]), (31, [0, 1, 2]), (32, [0, 1, 2]), (33, [0, 1, 2]), (34, [0, 1, 2]), (35, [0, 1, 2]), (36, [0, 1, 2]), (37, [0, 1, 2]), (38, [0, 1, 2]), (39, [0, 1, 2]), (40, [0, 1, 2]), (41, [0, 1, 2]), (42, [0, 1, 2]), (43, [0, 1, 2]), (44, [0, 1, 2]), (45, [0, 1, 2]), (46, [0, 1, 2]), (47, [0, 1, 2]), (48, [0, 1, 2]), (49, [0, 1, 2]), (50, [0, 1, 2]), (51, [0, 1, 2]), (52, [0, 1, 2]), (53, [0, 1, 2]), (54, [0, 1, 2]), (55, [0, 1, 2]), (56, [0, 1, 2]), (57, [0, 1, 2]), (58, [0, 1, 2]), (59, [0, 1, 2]), (60, [0, 1, 2]), (61, [0, 1, 2]), (62, [0, 1, 2]), (63, [0, 1, 2])]
    没有元素(3,[0,1,2])  3代表第三个partion [0,1,2] 代表 三个备份的分区程序运行完成后 [0,1,2] 会映射到详细的分区 若备份数为4 则 (3,[0,1,2,3])
        For an existing ring data set, partitions are reassigned similarly to
        the initial assignment. The devices are ordered by how many partitions
        they still want and kept in that order throughout the process. The
        gathered partitions are iterated through, assigning them to devices
        according to the "most wanted" while keeping the replicas as "far
        apart" as possible(尽可能的远). Two different regions are considered the
        farthest-apart things, followed by zones, then different ip/port pairs
        within a zone(zones 有不同的 ip/port 觉得是最远的); the least-far-apart things are different devices with
        the same ip/port pair in the same zone.

        If you want more replicas than devices, you won\'t get all your

        :param reassign_parts: An iterable of (part, replicas_to_replace)
                               pairs. replicas_to_replace is an iterable of the
                               replica (an int) to replace for that partition.
                               replicas_to_replace may be shared for multiple
                               partitions, so be sure you do not modify it.
        for dev in self._iter_devs():
            #(dev[\'parts_wanted\'], random.randint(0, 0xFFFF), dev[\'id\'])
            # (part_want,int,id)
            dev[\'sort_key\'] = self._sort_key_for(dev)
            dev[\'tiers\'] = tiers_for_dev(dev)
            print dev[\'sort_key\']
            print dev[\'tiers\']
            (126, 25509, 0)
            ((0,), (0, 0), (0, 0, \'\'), (0, 0, \'\', 0))
            (126, 55891, 1)
            ((1,), (1, 1), (1, 1, \'\'), (1, 1, \'\', 1))
            (3, 24684, 2)
            ((2,), (2, 3), (2, 3, \'\'), (2, 3, \'\', 2))
            (126。25509。0)(dev[\'parts_wanted\'], random.randint(0, 0xFFFF), dev[\'id\'])
            #sorted 函数会按d[\'sort_key\']中(a,b,c)a 升序排,a同样 则按b,b同样则按c 以此类推  (part_want,int,id)
        available_devs = \
            sorted((d for d in self._iter_devs() if d[\'weight\']),
                   key=lambda x: x[\'sort_key\'])

        [{\'tiers\': ((2,), (2, 3), (2, 3, \'\'), (2, 3, \'\', 2)), \'replication_port\': 6000, \'zone\': 3, \'weight\': 3.14159265359, \'sort_key\': (3, 57095, 2), \'ip\': \'\', \'region\': 2, \'parts\': 0, \'id\': 2, \'replication_ip\': \'\', \'meta\': \'some meta data\', \'device\': \'sda3\', \'parts_wanted\': 3, \'port\': 6000},
        {\'tiers\': ((0,), (0, 0), (0, 0, \'\'), (0, 0, \'\', 0)), \'replication_port\': 6000, \'weight\': 100.0, \'zone\': 0, \'sort_key\': (126, 14320, 0), \'ip\': \'\', \'region\': 0, \'id\': 0, \'replication_ip\': \'\', \'parts\': 0, \'meta\': \'some meta data\', \'device\': \'sda1\', \'parts_wanted\': 126, \'port\': 6000},
        {\'tiers\': ((1,), (1, 1), (1, 1, \'\'), (1, 1, \'\', 1)), \'replication_port\': 6001, \'weight\': 100.0, \'zone\': 1, \'sort_key\': (126, 26347, 1), \'ip\': \'\', \'region\': 1, \'id\': 1, \'replication_ip\': \'\', \'parts\': 0, \'meta\': \'\', \'device\': \'sda2\', \'parts_wanted\': 126, \'port\': 6001}]

        #java 新增此功能
        tier2devs = defaultdict(list)
        tier2sort_key = defaultdict(tuple)
        tier2dev_sort_key = defaultdict(list)
        max_tier_depth = 0
        #available_devs 已经排序
        for dev in available_devs:
            for tier in dev[\'tiers\']:
                #对一个tier2devs[tier] 同一个key 有多个dev 且 dev是按sort_key 升序排列的
                tier2devs[tier].append(dev)  # <-- starts out sorted!
                #tier2dev_sort_key 同一个key 相应多个 dev[sort_key]
                #对于tier2sort_key 中在迭代时 对于tier同样,最后会是dev[sort_key]最后的一个(也就是最大的一个 一般为 dev[sort_key]中dev[part_want]最大的,相应part_want同样的则会依据random.randint(0, 0xFFFF)排序。由于random.randint(0, 0xFFFF)是个随机值,也就是说会选择random.randint(0, 0xFFFF)产生的值最大的,对于partion选择设备时,对于part_want同样的,random.randint(0, 0xFFFF)大的会被优先被选上,这样也保证了在同样 part_want值的设备中,partion随机选择一个设备)
                tier2sort_key[tier] = dev[\'sort_key\']
                #求最长的tier  (1, 1, \'\', 1) 最长
                if len(tier) > max_tier_depth:
                    max_tier_depth = len(tier)
        #没一个tier 相应的子tier 如 () 相应的子tier为(1)。(2)。(3)
        tier2children_sets = build_tier_tree(available_devs)
        tier2children = defaultdict(list)
        tier2children_sort_key = {}
        tiers_list = [()]
        depth = 1
        while depth <= max_tier_depth:
            new_tiers_list = []
            #開始 tier 为()
            for tier in tiers_list:
                child_tiers = list(tier2children_sets[tier])
                #比方() 相应的child_tiers 为[(1,),(2,),(3)] 则会按每个孩子的sort_key 排序比如
                #tier2sort_key[\'(1,)\'] = (1,2,3)
                #tier2sort_key[\'(2,)\'] = (3,1,3)
                #tier2sort_key[\'(3,)\'] = (2,1,3)
                #排序后为 [(1,),(3,),(2)]
                #(1,)中的sort_key 为其全部child_tiers中最大的一个,(2,),(3,)也是如此
                #tier2children 每个key相应value中的值是按 sort_key 排序的
                tier2children[tier] = child_tiers

                tier2children_sort_key[tier] = map(
                    tier2sort_key.__getitem__, child_tiers)
            #tiers_list 最后中的元素为(2, 3, \'\', 2),即每一设备的(region,zone,ip:port,id)
            tiers_list = new_tiers_list
            depth += 1
        #(1, [0, 1, 2])  2**power
        for part, replace_replicas in reassign_parts:
            # Gather up(收集起来) what other tiers (regions, zones, ip/ports, and
            # devices) the replicas not-to-be-moved are in for this part.
            other_replicas = defaultdict(int)
            unique_tiers_by_tier_len = defaultdict(set)
            #_replicas_for_part  每个part相应的 replica 为一个list
            for replica in self._replicas_for_part(part):
                if replica not in replace_replicas:
                    dev = self.devs[self._replica2part2dev[replica][part]]
                    for tier in dev[\'tiers\']:
                        other_replicas[tier] += 1

            for replica in replace_replicas:
                tier = ()
                depth = 1
                #此循环为tiers 排序 依据 有多少当前分区的备份数
                while depth <= max_tier_depth:
                    # Order the tiers by how many replicas of this
                    # partition they already have(对tiers进行排序是依据他们分配了多少分区).
                    # Then, of the ones
                    # with the smallest number of replicas, pick the
                    # tier with the hungriest drive and then continue
                    # searching in that subtree.(因此有最少备份的分区,将会选择最饥饿的设备)
                    # There are other strategies we could use here,
                    # such as hungriest-tier(最饥渴的) (i.e. biggest
                    # sum-of-parts-wanted) or picking one at random.
                    # However, hungriest-drive is what was used here
                    # before, and it worked pretty well in practice.
                    # Note that this allocator will balance things as
                    # evenly as possible at each level of the device
                    # layout(在设备的每一层尽可能的平衡).
                    # If your layout is extremely unbalanced,
                    # this may produce poor results.
                    # This used to be a cute(聪明), recursive(递归) function, but it\'s been
                    # unrolled(扩展) for performance.

                    # We sort the tiers here so that, when we look for a tier
                    # with the lowest number of replicas, the first one we
                    # find is the one with the hungriest drive (i.e. drive
                    # with the largest sort_key value). This lets us
                    # short-circuit(环) the search while still ensuring we get the
                    # right tier
                    #candidate 候选
                    candidates_with_replicas = \
                        unique_tiers_by_tier_len[len(tier) + 1]
                    # Find a tier with the minimal replica count and the
                    # hungriest drive among all the tiers with the minimal
                    # replica count(找到replica最少的tier 并在这些replica最少的tier中找到最饥饿的drive).
                    if len(tier2children[tier]) > \
                        # There exists at least one tier with 0 other replicas
                        tier = max((t for t in tier2children[tier]
                                    if other_replicas[t] == 0),
                        tier = max(tier2children[tier],
                                   key=lambda t: (-other_replicas[t],
                    depth += 1
                #上边几步会选择出来一个 tier, 有待认真理解

                #tier2devs[tier]相应的值是按 sort_key升序排列的 所以此时的dev肯定是sort_key 最大的dev
                dev = tier2devs[tier][-1]
                dev[\'parts_wanted\'] -= 1
                dev[\'parts\'] += 1
                old_sort_key = dev[\'sort_key\']
                #更新 part_wanted 和parts后会又一次生成一个new_sort_key
                new_sort_key = dev[\'sort_key\'] = self._sort_key_for(dev)
                #由于dev的part_wanted 和 parts 变了 所以要做一些又一次插入操作
                for tier in dev[\'tiers\']:
                    other_replicas[tier] += 1
                    #返回 old_sort_key 在tier2dev_sort_key[tier]中插入的位置 不会插入 。主要是找到旧的sort_key 在 tier2dev_sort_key中的位置
                    index = bisect.bisect_left(tier2dev_sort_key[tier],
                    #下边两行是把tier2devs和tier2dev_sort_key 之前旧的sort_key 相应的dev删去
                    #依据新的的sort_key 找到dev应该插入的新位置 并插入的tier2devs和tier2dev_sort_key新的位置上去
                    new_index = bisect.bisect_left(tier2dev_sort_key[tier],
                    tier2devs[tier].insert(new_index, dev)
                    tier2dev_sort_key[tier].insert(new_index, new_sort_key)

                    #更新tier2sort_key 是teir2sort_key 为相应tier中sort_key为最大的
                    new_last_sort_key = tier2dev_sort_key[tier][-1]
                    tier2sort_key[tier] = new_last_sort_key

                    # Now jiggle(轻摇 摇动) tier2children values to keep them sorted
                    #找到上一级tier 如(1,2)相应的上一级为(1,)
                    parent_tier = tier[0:-1]
                    index = bisect.bisect_left(
                    popped = tier2children[parent_tier].pop(index)

                    new_index = bisect.bisect_left(
                    tier2children[parent_tier].insert(new_index, popped)
                        new_index, new_last_sort_key)

                self._replica2part2dev[replica][part] = dev[\'id\']


            pathjoin(backup_dir, \'%d.\' % ts + basename(ring_file)))
        builder.save(pathjoin(backup_dir, \'%d.\' % ts + basename(argv[1])))

def __init__(self, replica2part2dev_id, devs, part_shift):
        self.devs = devs
        self._replica2part2dev_id = replica2part2dev_id
        self._part_shift = part_shift
        #dev 格式为dev0 = {\'region\': 1, \'zone\': 1, \'ip\': \'\',
        #            \'port\': \'6000\', \'id\': 0}
        for dev in self.devs:
            if dev is not None:
                dev.setdefault("region", 1)
从上面函数能够看出,其序列化的事实上就是replica2part2dev_id(备份到分区到设备的映射), devs(全部的设备), part_shift(右移的位数)。


2 Ring数据的使用


class Ring(object):
    Partitioned consistent hashing ring(分区一致性hash).

    :param serialized_path: path to serialized RingData instance
    :param reload_time: time interval(间隔) in seconds to check for a ring change

    def __init__(self, serialized_path, reload_time=15, ring_name=None):
        # can\'t use the ring unless HASH_PATH_SUFFIX(后缀 下标) is set
        if ring_name:
            self.serialized_path = os.path.join(serialized_path,
                                                ring_name + \'.ring.gz\')
            self.serialized_path = os.path.join(serialized_path)
        self.reload_time = reload_time

    def _reload(self, force=False):
        self._rtime = time() + self.reload_time
        if force or self.has_changed():
            ring_data = RingData.load(self.serialized_path)
            self._mtime = getmtime(self.serialized_path)
            self._devs = ring_data.devs
            # NOTE(akscram): Replication parameters like replication_ip
            #                and replication_port are required for
            #                replication process. An old replication
            #                ring doesn\'t contain this parameters into
            #                device. Old-style pickled rings won\'t have
            #                region information.
            for dev in self._devs:
                if dev:
                    dev.setdefault(\'region\', 1)
                    if \'ip\' in dev:
                        dev.setdefault(\'replication_ip\', dev[\'ip\'])
                    if \'port\' in dev:
                        dev.setdefault(\'replication_port\', dev[\'port\'])

            self._replica2part2dev_id = ring_data._replica2part2dev_id
            self._part_shift = ring_data._part_shift

            # Do this now, when we know the data has changed, rather then
            # doing it on every call to get_more_nodes().
            regions = set()
            zones = set()
            ip_ports = set()
            self._num_devs = 0
            for dev in self._devs:
                if dev:
                    zones.add((dev[\'region\'], dev[\'zone\']))
                    ip_ports.add((dev[\'region\'], dev[\'zone\'],
                                  dev[\'ip\'], dev[\'port\']))
                    self._num_devs += 1
            self._num_regions = len(regions)
            self._num_zones = len(zones)
            self._num_ip_ports = len(ip_ports)

    def _rebuild_tier_data(self):
        self.tier2devs = defaultdict(list)
        for dev in self._devs:
            if not dev:
            for tier in tiers_for_dev(dev):

        tiers_by_length = defaultdict(list)
        for tier in self.tier2devs:
        self.tiers_by_length = sorted(tiers_by_length.values(),
                                      key=lambda x: len(x[0]))
        for tiers in self.tiers_by_length:

    def replica_count(self):
        """Number of replicas (full or partial) used in the ring."""
        return len(self._replica2part2dev_id)

    def partition_count(self):
        """Number of partitions in the ring."""
        return len(self._replica2part2dev_id[0])

    def devs(self):
        """devices in the ring"""
        #15秒 扫描一次
        if time() > self._rtime:
        return self._devs

    def has_changed(self):
        Check to see if the ring on disk is different than the current one in

        :returns: True if the ring on disk has changed, False otherwise
        return getmtime(self.serialized_path) != self._mtime

    def _get_part_nodes(self, part):
        part_nodes = []
        seen_ids = set()
        for r2p2d in self._replica2part2dev_id:
            if part < len(r2p2d):
                dev_id = r2p2d[part]
                if dev_id not in seen_ids:
        return part_nodes

#   common/db_replicator.py _replicate_object 用到
    def get_part(self, account, container=None, obj=None):
        Get the partition for an account/container/object.

        :param account: account name
        :param container: container name
        :param obj: object name
        :returns: the partition number
        key = hash_path(account, container, obj, raw_digest=True)
        if time() > self._rtime:
        part = struct.unpack_from(\'>I\', key)[0] >> self._part_shift
        return part

    def get_part_nodes(self, part):
        Get the nodes that are responsible for the partition. If one
        node is responsible for more than one replica of the same
        partition, it will only appear in the output once.

        :param part: partition to get nodes for
        :returns: list of node dicts

        See :func:`get_nodes` for a description of the node dicts.

        if time() > self._rtime:
        return self._get_part_nodes(part)

    def get_nodes(self, account, container=None, obj=None):
        Get the partition and nodes for an account/container/object.
        If a node is responsible for more than one replica, it will
        only appear in the output once.

        :param account: account name
        :param container: container name
        :param obj: object name
        :returns: a tuple of (partition, list of node dicts)

        Each node dict will have at least the following keys:

        ======  ===============================================================
        id      unique integer identifier amongst devices
        weight  a float of the relative weight of this device as compared to
                others; this indicates how many partitions the builder will try
                to assign to this device
        zone    integer indicating which zone the device is in; a given
                partition will not be assigned to multiple devices within the
                same zone
        ip      the ip address of the device
        port    the tcp port of the device
        device  the device\'s name on disk (sdb1, for example)
        meta    general use \'extra\' field; for example: the online date, the
                hardware description
        ======  ===============================================================
        part = self.get_part(account, container, obj)
        return part, self._get_part_nodes(part)

    def get_more_nodes(self, part):
        Generator to get extra nodes for a partition for hinted handoff(由于为守护进程在出来 故为私下转移).
        #handoff nodes 尝试处于primary primary_nodes 之外的 zones中
        The handoff nodes will try to be in zones other than the
        primary zones, will take into account the device weights, and
        will usually keep the same sequences of handoffs even with
        ring changes.要是handoffs 处于同样的序列中即使ring 改变了

        :param part: partition to get handoff nodes for
        :returns: generator of node dicts

        See :func:`get_nodes` for a description of the node dicts.
        if time() > self._rtime:
        primary_nodes = self._get_part_nodes(part)

        used = set(d[\'id\'] for d in primary_nodes)
        same_regions = set(d[\'region\'] for d in primary_nodes)
        same_zones = set((d[\'region\'], d[\'zone\']) for d in primary_nodes)
        same_ip_ports = set((d[\'region\'], d[\'zone\'], d[\'ip\'], d[\'port\'])
                            for d in primary_nodes)
        parts = len(self._replica2part2dev_id[0])
        # unpack_from(fmt, buffer, offset=0): # known case of _struct.unpack_from
        #Unpack the buffer, containing packed C structure data, according to
        #fmt, starting at offset. Requires len(buffer[offset:]) >= calcsize(fmt).
        #>I I代表整数, >I 表示大端存储
        start = struct.unpack_from(
            \'>I\', md5(str(part)).digest())[0] >> self._part_shift
        inc = int(parts / 65536) or 1
        # Multiple loops for execution speed; the checks and bookkeeping get
        # simpler as you go along
        #hit 命中
        hit_all_regions = len(same_regions) == self._num_regions
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_regions:
                # At this point, there are no regions left untouched, so we
                # can stop looking.
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    dev = self._devs[dev_id]
                    region = dev[\'region\']
                    if dev_id not in used and region not in same_regions:
                        yield dev
                        zone = dev[\'zone\']
                        ip_port = (region, zone, dev[\'ip\'], dev[\'port\'])
                        same_zones.add((region, zone))
                        if len(same_regions) == self._num_regions:
                            hit_all_regions = True

        hit_all_zones = len(same_zones) == self._num_zones
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_zones:
                # Much like we stopped looking for fresh regions before, we
                # can now stop looking for fresh zones; there are no more.
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    dev = self._devs[dev_id]
                    zone = (dev[\'region\'], dev[\'zone\'])
                    if dev_id not in used and zone not in same_zones:
                        yield dev
                        ip_port = zone + (dev[\'ip\'], dev[\'port\'])
                        if len(same_zones) == self._num_zones:
                            hit_all_zones = True

        hit_all_ip_ports = len(same_ip_ports) == self._num_ip_ports
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_ip_ports:
                # We\'ve exhausted(耗尽) the pool of unused backends, so stop
                # looking.
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    dev = self._devs[dev_id]
                    ip_port = (dev[\'region\'], dev[\'zone\'],
                               dev[\'ip\'], dev[\'port\'])
                    if dev_id not in used and ip_port not in same_ip_ports:
                        yield dev
                        if len(same_ip_ports) == self._num_ip_ports:
                            hit_all_ip_ports = True

        hit_all_devs = len(used) == self._num_devs
        for handoff_part in chain(xrange(start, parts, inc),
                                  xrange(inc - ((parts - start) % inc),
                                         start, inc)):
            if hit_all_devs:
                # We\'ve used every device we have, so let\'s stop looking for
                # unused devices now.
            for part2dev_id in self._replica2part2dev_id:
                if handoff_part < len(part2dev_id):
                    dev_id = part2dev_id[handoff_part]
                    if dev_id not in used:
                        yield self._devs[dev_id]
                        if len(used) == self._num_devs:
                            hit_all_devs = True

get_part(self, account, container=None, obj=None)

def get_part_nodes(self, part):

def get_nodes(self, account, container=None, obj=None):

def get_more_nodes(self, part):



                                                                                                                       图1 环的运行机制

利用上图指示的关系。假如我们要存入一个对象,首先通过对象的account/container/object的md5值的前四个字节(32位)右移动part_shift位,得到详细对象相应的分区号,然后通过replica2part2dev_id(备份到分区到设备的映射), 找到当前分区相应的三个备份设备的Id。获得id后从devs里面得到详细的dev,因dev里面存有设备的ip,port,以及存储数据的磁盘,分别创建请求这三个设备,将数据存入设备上。















