• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python storage.load函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中storage.load函数的典型用法代码示例。如果您正苦于以下问题:Python load函数的具体用法?Python load怎么用?Python load使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了load函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _generate_index

 def _generate_index(self, session):
     store = storage.load()
     Base.metadata.create_all(self._engine)
     session.add(Version(id=self.version))
     for repository in self._walk_storage(store=store):
         session.add(Repository(**repository))
     session.commit()
开发者ID:rivik,项目名称:docker-registry,代码行数:7,代码来源:db.py


示例2: validate_token

def validate_token(auth):
    full_repos_name = auth.get('repository', '').split('/')
    if len(full_repos_name) != 2:
        logger.debug('validate_token: Invalid repository field')
        return False
    cfg = config.load()
    index_endpoint = cfg.index_endpoint
    if index_endpoint is None:
        index_endpoint = 'https://index.docker.io'
    index_endpoint = index_endpoint.strip('/')
    url = '{0}/v1/repositories/{1}/{2}/images'.format(index_endpoint,
                                                      full_repos_name[0],
                                                      full_repos_name[1])
    headers = {'Authorization': flask.request.headers.get('authorization')}
    resp = requests.get(url, verify=True, headers=headers)
    logger.debug('validate_token: Index returned {0}'.format(resp.status_code))
    if resp.status_code != 200:
        return False
    store = storage.load()
    try:
        images_list = [i['id'] for i in json.loads(resp.text)]
        store.put_content(store.images_list_path(*full_repos_name),
                          json.dumps(images_list))
    except json.JSONDecodeError:
        logger.debug('validate_token: Wrong format for images_list')
        return False
    return True
开发者ID:GaretJax,项目名称:docker-registry,代码行数:27,代码来源:toolkit.py


示例3: execute

 def execute(self):
     books = storage.load(self._configuration, 'library')
     fields = set()
     for book in books:
         for field in book.keys():
             if field[0] is not '_':
                 fields.add(field)
     self.out('\n'.join(fields))
     return None, None
开发者ID:TomRegan,项目名称:roots,代码行数:9,代码来源:command.py


示例4: _query

def _query(configuration, restrict='title', select=None):
    books = storage.load(configuration, 'library')
    if select is None:
        return books
    else:
        return [book for book in books
                if restrict in book.keys()
                and select.upper()
                in unicode(book[restrict]).upper()]
开发者ID:TomRegan,项目名称:roots,代码行数:9,代码来源:command.py


示例5: wrapper

    def wrapper(namespace, repository, *args, **kwargs):
        cfg = config.load()
        mirroring_cfg = cfg.mirroring
        resp = f(namespace, repository, *args, **kwargs)
        if not mirroring_cfg:
            return resp
        source = mirroring_cfg['source']
        tags_cache_ttl = mirroring_cfg.get('tags_cache_ttl',
                                           DEFAULT_CACHE_TAGS_TTL)

        if resp.status_code != 404:
            logger.debug('Status code is not 404, no source '
                         'lookup required')
            return resp

        if not cache.redis_conn:
            # No tags cache, just return
            logger.warning('mirroring: Tags cache is disabled, please set a '
                           'valid `cache\' directive in the config.')
            source_resp = lookup_source(
                flask.request.path, stream=False, source=source
            )
            if not source_resp:
                return resp
            return toolkit.response(data=source_resp.content,
                                    headers=source_resp.headers, raw=True)

        store = storage.load()
        request_path = flask.request.path

        if request_path.endswith('/tags'):
            # client GETs a list of tags
            tag_path = store.tag_path(namespace, repository)
        else:
            # client GETs a single tag
            tag_path = store.tag_path(namespace, repository, kwargs['tag'])

        data = cache.redis_conn.get('{0}:{1}'.format(
            cache.cache_prefix, tag_path
        ))
        if data is not None:
            return toolkit.response(data=data, raw=True)
        source_resp = lookup_source(
            flask.request.path, stream=False, source=source
        )
        if not source_resp:
            return resp
        data = source_resp.content
        cache.redis_conn.setex('{0}:{1}'.format(
            cache.cache_prefix, tag_path
        ), tags_cache_ttl, data)
        return toolkit.response(data=data, headers=source_resp.headers,
                                raw=True)
开发者ID:Anvil,项目名称:docker-registry,代码行数:53,代码来源:mirroring.py


示例6: storage_status

def storage_status():
    message = ''
    try:
        _storage = storage.load(_config.storage)
        key = toolkit.gen_random_string()
        value = toolkit.gen_random_string()
        _storage.put_content(key, value)
        stored_value = _storage.get_content(key)
        _storage.remove(key)
        if value != stored_value:
            message = 'Set value is different from what was received'
    except Exception as e:
        message = str(e)
    return {'storage': message}
开发者ID:1uptalent,项目名称:docker-registry,代码行数:14,代码来源:status.py


示例7: __init__

    def __init__(self, nickname, server, port=6667):
        SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
        self.ircobj.add_global_handler("all_events", self.dispatcher, 0)

        self.logged_in = False
        
        loghandler.Handler(self)
        self.storage = storage.load()
        self.storage.dirty_cb = self.storage_dirty_cb

        self.ipc = ipc.ListeningConnection(self.ircobj)

        self.timers = []
        Dekisu.instance = self
        Timer.bot = self
开发者ID:dequis,项目名称:dekisu,代码行数:15,代码来源:dekisu.py


示例8: __init__

 def __init__(self, configuration):
     term = configuration['terminal']
     self._configuration = configuration
     try:
         db = storage.load(configuration, 'isbndb')
         self._rate = db['rate']
         if self._rate.date < date.today():
             term.debug("Resetting limit, expired %s" % self._rate.date)
             self._rate = Rate(limit=configuration['isbndb']['limit'],
                               date=date.today())
     except:
         self._rate = Rate(limit=configuration['isbndb']['limit'],
                           date=date.today())
     if self._rate is not None:
         term.debug('%s ISBNDB requests permitted on %s.',
                    self._rate.limit, self._rate.date)
     else:
         term.debug('ISBNDB requests not limited.')
开发者ID:TomRegan,项目名称:roots,代码行数:18,代码来源:isbndb.py


示例9: docker_pull

 def docker_pull(self, namespace, repos):
     # Test pull
     # Docker -> Index
     resp = requests.get('{0}/v1/repositories/{1}/{2}/images'.format(
         self.index_endpoint, namespace, repos),
         auth=tuple(self.user_credentials),
         headers={'X-Docker-Token': 'true'})
     self.assertEqual(resp.status_code, 200)
     token = resp.headers.get('x-docker-token')
     # Here we should use the 'X-Endpoints' returned in a real environment
     # Docker -> Registry
     resp = requests.get('{0}/v1/repositories/{1}/{2}/tags/latest'.format(
                         self.registry_endpoint, namespace, repos),
                         headers={'Authorization': 'Token ' + token})
     self.assertEqual(resp.status_code, 200, resp.text)
     self.cookies = resp.cookies
     # Docker -> Registry
     image_id = json.loads(resp.text)
     resp = requests.get('{0}/v1/images/{1}/ancestry'.format(
         self.registry_endpoint, image_id),
         cookies=self.cookies)
     self.update_cookies(resp)
     self.assertEqual(resp.status_code, 200, resp.text)
     ancestry = json.loads(resp.text)
     # We got the ancestry, let's fetch all the images there
     for image_id in ancestry:
         json_data, checksum, blob = self.fetch_image(image_id)
         # check queried checksum and local computed checksum from the image
         # are the same
         tmpfile = StringIO.StringIO()
         tmpfile.write(blob)
         tmpfile.seek(0)
         computed_checksum = checksums.compute_simple(tmpfile, json_data)
         tmpfile.close()
         self.assertEqual(checksum, computed_checksum)
     # Remove image tags
     resp = requests.delete('{0}/v1/repositories/{1}/{2}/tags'.format(
         self.registry_endpoint, namespace, repos), cookies=self.cookies)
     self.assertEqual(resp.status_code, 200, resp.text)
     self.update_cookies(resp)
     # Remove image_id, then parent_id
     store = storage.load()
     store.remove(os.path.join(store.images, self.image_id))
     store.remove(os.path.join(store.images, self.parent_id))
开发者ID:23critters,项目名称:docker-registry,代码行数:44,代码来源:workflow.py


示例10: load_graph

def load_graph(fname):
    """ Loads a graph from the given file
    """
    sav = storage.load(fname)

    ver = sav['version']
    
    SAVE_FORMAT_VERSION = 5
    if ver > SAVE_FORMAT_VERSION:
        print "File format version {} incompatible!".format(ver)
        sys.exit()

    leaf = sav['leaf']
    tree = sav['tree']
    filt = sav['filtration']
    remv = sav['removed-edges']
    prun = sav['pruned']

    return leaf, tree, filt, remv, prun
开发者ID:hronellenfitsch,项目名称:nesting,代码行数:19,代码来源:analyzer.py


示例11: load_state

 def load_state(self):
     self.voted_for, self.current_term, self.log = storage.load(self.id)
     # print "voted for", self.voted_for
     print self.current_term
     print self.log
开发者ID:Zlash92,项目名称:CS271-RAFT,代码行数:5,代码来源:server.py


示例12: len

import storage

count = storage.load()
print "Total patterns: ", len(storage.TREE.patterns)
print "Total messages: ", count
开发者ID:gmlexx,项目名称:ErrorsDigest,代码行数:5,代码来源:test_storage.py


示例13: _load_map_points_from_model

 def _load_map_points_from_model(self):
     model = storage.load(self._args.model)[0]
     return model.normalized_observed_reductions
开发者ID:gaborpapp,项目名称:AIam,代码行数:3,代码来源:test_navigator.py


示例14: setUp

 def setUp(self):
     self._storage = storage.load('local')
开发者ID:ehazlett,项目名称:docker-registry,代码行数:2,代码来源:test_storage.py


示例15: setUp

 def setUp(self):
     self._storage = storage.load('swift')
     self._storage._swift_connection.put_container(
         self._storage._swift_container
     )
开发者ID:1uptalent,项目名称:docker-registry,代码行数:5,代码来源:test_swift_storage.py


示例16: UDPServer

#!/usr/bin/python

import sys
import resources
import storage
from twisted.internet.protocol import DatagramProtocol
from twisted.web import server
from twisted.application import service, internet
from twisted.python import log
from django.conf import settings

storage.load()

settings.configure(TEMPLATE_DIRS=('templates',))

class UDPServer(DatagramProtocol):
	def datagramReceived(self, datagram, address):
 		data = datagram.strip().decode('cp1251')
		storage.put(data)

class Service(service.Service):
	def startService(self):
		log.startLogging(sys.stdout)
		service.Service.startService(self)
		log.msg('ErrorDigest server started')

_topService = service.MultiService()

_service = Service()
_service.setServiceParent(_topService)
开发者ID:gmlexx,项目名称:ErrorsDigest,代码行数:30,代码来源:server.py


示例17: ImproviseParameters

        improvise_params,
        preferred_location,
        MAX_NOVELTY)

improvise_params = ImproviseParameters()
improvise_params.set_values_from_args(args)
improvise_behaviors = {
    model_name: _create_improvise_behavior(model_name)
    for model_name in MODELS}

set_up_logging()

index = 0
memory = Memory()
if args.memory:
    memory.set_frames(storage.load(args.memory))
recall_behavior = RecallBehavior()
master_behavior = MasterBehavior() 
avatar = Avatar(index, master_entity, master_behavior)

def clear_memory():
    recall_behavior.reset()
    memory.clear()
            
avatars = [avatar]

application = Application(
    students[args.model], avatars, args, receive_from_pn=True, create_entity=create_entity, z_up=Z_UP)

set_model(args.model)
set_max_angular_step(args.max_angular_step)
开发者ID:gaborpapp,项目名称:AIam,代码行数:31,代码来源:learn_recall_improvise.py


示例18: load_memory

 def load_memory():
     filename = QtGui.QFileDialog.getLoadFileName(self, "Load memory", filter="Memory (*.mem)")
     if filename:
         memory.set_frames(storage.load(filename))
         recall_behavior.reset()
开发者ID:gaborpapp,项目名称:AIam,代码行数:5,代码来源:learn_recall_improvise.py


示例19: main

def main():
    if len(sys.argv) == 1:
        parser.print_help()
        exit
    args = parser.parse_args()

    if args.fib:
        for x in fib.generate_fib_sequence(args.fib):
            print x

    if args.sto:
        storage = storage.Storage()
        while input != "exit": 
            input = raw_input("Enter command:")
            items = input.split()
            if len(items) == 0:
                print "Command required"
                continue
            command = items[0]
            del(items[0])
            if command == "add":
                for x in items:
                    storage.add(x)
            elif command == "remove":
                for x in items:
                    storage.remove(x)
            elif command == "find":
                for x in items:
                    storage.find(x)
            elif command == "list":
                storage.list()
            elif command == "save":
                print items[0]
                storage.save(items[0])
            elif command == "load":
                storage.load(items[0])
            elif command == "grep":
                storage.grep(items[0])    
            elif command != "exit":
                print "Invalid command"

    if args.text:
        try:
            with open(args.text, "r") as file:
                text = file.read()
        except IOError:
            print "File {} doesn't exist".format(args.text)

        print "Average wordcount:{}".format(text_statistics.average_wordcount(text))
        print "Median wordcount: {}".format(text_statistics.median_wordcount(text))
        print "Word count: {}".format(text_statistics.count_words(text))
        try: 
            n = raw_input("Enter N:")
            number = raw_input("Enter K:")
            print "Top {0} of {1}-grams is:".format(number, n) 
            print text_statistics.top_ngrams(text, int(n), int(number))
        except ValueError:
            print "Invalid input. Integer required!"

    if args.sort:
        try: 
            with open(args.sort, "r") as file:
                ara = file.read().replace("\n", "").replace(" ", "").split(",")
                ara = map(int, ara)
        except IOError:
            print "File {} doesn't exist".format(args.sort)
        print "quick sort"
        sort.demo(ara, sort.quick_sort)
        print "merge sort"
        sort.demo(ara, sort.merge_sort)
        print "radix sort"
        sort.demo(ara, sort.radix_sort)
开发者ID:pokapoka,项目名称:My-Labs,代码行数:72,代码来源:myawesomelab.py


示例20: setUp

 def setUp(self):
     self._cfg = config.load()
     conn = boto.connect_s3(self._cfg.s3_access_key, self._cfg.s3_secret_key)
     conn.create_bucket(self._cfg.s3_bucket)
     self._storage = storage.load('s3')
开发者ID:richardmurri,项目名称:docker-registry,代码行数:5,代码来源:test_s3.py



注:本文中的storage.load函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python storage.Storage类代码示例发布时间:2022-05-27
下一篇:
Python storage.get_storage函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap