• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python shelve.open函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中shelve.open函数的典型用法代码示例。如果您正苦于以下问题:Python open函数的具体用法?Python open怎么用?Python open使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了open函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__( self, prefix, nruns=None ):
        self.prefix = prefix
        self.files = np.sort( np.array( glob.glob( prefix + "???/cmonkey_run.db" ) ) ) # get all cmonkey_run.db files

        self.cms = shelve.open('egrin2_shelf.db', protocol=2, writeback=False)
        keys = self.cms.keys()
        for f in self.files:
            index = os.path.dirname(f).replace(self.prefix,'')
            if index in keys:
                continue
            try:
                print f
                b = cm2(f)
                self.cms[ index ] = b
            except:
                print 'ERROR: %s' % (f)
                e = sys.exc_info()[0]
                print e

            if nruns is not None and len(self.cms) >= nruns:
                break

        #self.tables = { tname: self.concat_tables(tname) for tname in self.cms['001'].tables.keys() }
        self.tables = shelve.open('egrin2_tables_shelf.db', protocol=2, writeback=False)
        keys = self.tables.keys()
        for tname in self.cms['001'].tables.keys():
            if tname in keys:
                continue
            tab = self.concat_tables(tname)
            self.tables[str(tname)] = tab
开发者ID:scalefreegan,项目名称:egrin2-tools,代码行数:30,代码来源:egrin2obj.py


示例2: _open_cache

	def _open_cache(cls):
		cls.lock.acquire()
		try:
			cls.cache = shelve.open(cls.cache_filename)
		except UnicodeError as e:
			cls.log.exception("Warning: Failed to open "+cls.cache_filename+": "+unicode(e))
			# This can happen with unicode characters in the path because the bdb module
			# on win converts it internally to ascii, which fails
			# The shelve module therefore does not support writing to paths containing non-ascii characters in general,
			# which means we cannot store data persistently.
			cls.cache = DummyShelve()
		except Exception as e:
			# 2 causes for this:

			# Some python distributions used to use the bsddb module as underlying shelve.
			# The bsddb module is now deprecated since 2.6 and is not present in some 2.7 distributions.
			# Therefore, the line above will yield an ImportError if it has been executed with
			# a python supporting bsddb and now is executed again with python with no support for it.
			# Since this may also be caused by a different error, we just try again once.

			# If there is an old database file that was created with a
			# deprecated dbm library, opening it will fail with an obscure exception, so we delete it
			# and simply retry.
			cls.log.exception("Warning: You probably have an old cache file; deleting and retrying: "+unicode(e))
			if os.path.exists(cls.cache_filename):
				os.remove(cls.cache_filename)
			cls.cache = shelve.open(cls.cache_filename)
		cls.lock.release()
开发者ID:MasterofJOKers,项目名称:unknown-horizons,代码行数:28,代码来源:yamlcache.py


示例3: harvest_program_pages

def harvest_program_pages(program_ids, from_cache=True):
    """
    Download the source of program pages efficiently.
    :param program_ids: A collection of UQ Program IDs.
    :return: A list of web page sources
    """

    if from_cache:
        with shelve.open('.cache/local_shelf') as db_read:
            sources = db_read.get('program_pages')
        if sources:
            return sources

    prog_url_template = 'https://www.uq.edu.au/study/program.html?acad_prog={}'
    prog_urls = (prog_url_template.format(prog_id) for prog_id in program_ids)

    print('About to download {} pages'.format(len(program_ids)))
    with futures.ThreadPoolExecutor(max_workers=10) as executor:
        page_futures = [executor.submit(requests.get, prog_url) for prog_url in prog_urls]

    print('Downloading...')

    # Wait to load all results
    results = futures.wait(page_futures)

    sources = [completed.result().content for completed in results.done]
    print('Downloaded {} pages'.format(len(sources)))

    with shelve.open('.cache/local_shelf') as db_write:
        db_write['program_pages'] = sources

    return sources
开发者ID:kamikai,项目名称:Cosc3000_Project1,代码行数:32,代码来源:programs.py


示例4: __init__

 def __init__(self, indexname, truncate=None):
     dict.__init__(self)
     try:
         if truncate:
             # In python 1.52 and before, dumbdbm (under shelve)
             # doesn't clear the old database.
             files = [indexname + '.dir',
                      indexname + '.dat',
                      indexname + '.bak'
                      ]
             for file in files:
                 if os.path.exists(file):
                     os.unlink(file)
             raise Exception("open a new shelf")
         self.data = shelve.open(indexname, flag='r')
     except:
         # No database exists.
         self.data = shelve.open(indexname, flag='n')
         self.data[self.__version_key] = self.__version
     else:
         # Check to make sure the database is the correct version.
         version = self.data.get(self.__version_key, None)
         if version is None:
             raise IOError("Unrecognized index format")
         elif version != self.__version:
             raise IOError("Version %s doesn't match my version %s"
                           % (version, self.__version))
开发者ID:DavidCain,项目名称:biopython,代码行数:27,代码来源:Index.py


示例5: __init__

    def __init__(self, rememberer_name, config, saml_client, wayf, cache,
                 sid_store=None, discovery="", idp_query_param="",
                 sid_store_cert=None,):
        self.rememberer_name = rememberer_name
        self.wayf = wayf
        self.saml_client = saml_client
        self.conf = config
        self.cache = cache
        self.discosrv = discovery
        self.idp_query_param = idp_query_param
        self.logout_endpoints = [urlparse(ep)[2] for ep in config.endpoint(
            "single_logout_service")]
        try:
            self.metadata = self.conf.metadata
        except KeyError:
            self.metadata = None
        if sid_store:
            self.outstanding_queries = shelve.open(sid_store, writeback=True)
        else:
            self.outstanding_queries = {}
        if sid_store_cert:
            self.outstanding_certs = shelve.open(sid_store_cert, writeback=True)
        else:
            self.outstanding_certs = {}

        self.iam = platform.node()
开发者ID:5monkeys,项目名称:pysaml2,代码行数:26,代码来源:sp.py


示例6: _cache

    def _cache(fn, *args, **kwargs):
        if cache.disabled:
            return fn(*args, **kwargs)

        # A bit obscure, but simplest way to generate unique key for
        # functions and methods in python 2 and 3:
        key = "{}.{}".format(fn.__module__, repr(fn).split("at")[0])

        etag = ".".join(_get_mtime(name) for name in depends_on)
        cache_path = _get_cache_path()

        try:
            with closing(shelve.open(cache_path)) as db:
                if db.get(key, {}).get("etag") == etag:
                    return db[key]["value"]
                else:
                    value = fn(*args, **kwargs)
                    db[key] = {"etag": etag, "value": value}
                    return value
        except shelve_open_errors:
            # Caused when going from Python 2 to Python 3 and vice-versa
            warn("Removing possibly out-dated cache")
            os.remove(cache_path)

            with closing(shelve.open(cache_path)) as db:
                value = fn(*args, **kwargs)
                db[key] = {"etag": etag, "value": value}
                return value
开发者ID:IINT3LIIG3NCII,项目名称:thefuck,代码行数:28,代码来源:utils.py


示例7: _cache

    def _cache(fn, *args, **kwargs):
        if cache.disabled:
            return fn(*args, **kwargs)

        # A bit obscure, but simplest way to generate unique key for
        # functions and methods in python 2 and 3:
        key = '{}.{}'.format(fn.__module__, repr(fn).split('at')[0])

        etag = '.'.join(_get_mtime(name) for name in depends_on)
        cache_dir = get_cache_dir()
        cache_path = Path(cache_dir).joinpath('thefuck').as_posix()

        try:
            with closing(shelve.open(cache_path)) as db:
                if db.get(key, {}).get('etag') == etag:
                    return db[key]['value']
                else:
                    value = fn(*args, **kwargs)
                    db[key] = {'etag': etag, 'value': value}
                    return value
        except (shelve_open_error, ImportError):
            # Caused when switching between Python versions
            warn("Removing possibly out-dated cache")
            os.remove(cache_path)

            with closing(shelve.open(cache_path)) as db:
                value = fn(*args, **kwargs)
                db[key] = {'etag': etag, 'value': value}
                return value
开发者ID:Googulator,项目名称:thefuck,代码行数:29,代码来源:utils.py


示例8: download

    def download(self, url):
        cached  = False
        refresh = False
        cache = shelve.open(os.path.join(os.environ["pydna_data_dir"], "web"), protocol=cPickle.HIGHEST_PROTOCOL, writeback=False)
        key = str(url)

        if os.environ["pydna_cache"] in ("compare", "cached"):
            try:
                cached = cache[key]
            except KeyError:
                if os.environ["pydna_cache"] == "compare":
                    raise Exception("no result for this key!")
                else:
                    refresh = True

        if refresh or os.environ["pydna_cache"] in ("compare", "refresh", "nocache"):
            response = urllib2.urlopen(url)
            result = response.read()

        if os.environ["pydna_cache"] == "compare":
            if result!=cached:
                module_logger.warning('download error')

        if refresh or os.environ["pydna_cache"] == "refresh":
            cache = shelve.open(os.path.join(os.environ["pydna_data_dir"],"genbank"), protocol=cPickle.HIGHEST_PROTOCOL, writeback=False)
            cache[key] = result

        elif cached and os.environ["pydna_cache"] not in ("nocache", "refresh"):
            result = cached

        cache.close()

        return result
开发者ID:souravsingh,项目名称:pydna,代码行数:33,代码来源:download.py


示例9: get_packet

    def get_packet(self, session_id: int=0, *struct_labels):

        # type checking
        if not isinstance(session_id, int):
            raise TypeError("session_id must be an int and not %s" % str(session_id))
        if len(struct_labels) < 1:
            raise ValueError("struct_labels must specify the path to a leaf.")

        # open log_index_file as read-only
        log_index_file = shelve.open(self.log_path, flag='r', protocol=3, writeback=False)

       # find the desired session dir
        if session_id > 0:
            if session_id > log_index_file[self.idx_num_session_key]:
                raise ValueError('session_id must be <= %d' % log_index_file[self.idx_num_session_key])

            session_dir = log_index_file[str(session_id)]
        else:
            curr_session = int(log_index_file[self.idx_num_session_key])
            if curr_session + session_id < 1:
                raise ValueError('Current session is only %d' % curr_session)

            session_dir = log_index_file[str(curr_session + session_id)]

        session_path = os.path.join(os.path.dirname(self.log_path), session_dir, session_dir)
        session_shelf = shelve.open(session_path, flag='r', protocol=3, writeback=False)

        return session_shelf[self.encode_struct(*struct_labels)]
开发者ID:rbgorbet,项目名称:Hylozoic-Series-3,代码行数:28,代码来源:data_logger.py


示例10: __init__

    def __init__(self, host, username, password, debug=False):
        """
        Server class __init__ which expects an IMAP host to connect to

        @param host: gmail's default server is fine: imap.gmail.com
        @param username: your gmail account (i.e. [email protected])
        @param password: we highly recommend you to use 2-factor auth here
        """

        if not host:
            raise Exception('Missing IMAP host parameter in your config')

        try:
            self._server = IMAPClient(host, use_uid=True, ssl=True)
        except:
            raise Exception('Could not successfully connect to the IMAP host')

        setattr(self._server, 'debug', debug)

        # mails index to avoid unnecessary redownloading
        index = '.index_%s' % (username)
        index = os.path.join(_app_folder(), index)
        self._index = shelve.open(index, writeback=True)

        # list of attachments hashes to avoid dupes
        hashes = '.hashes_%s' % (username)
        hashes = os.path.join(_app_folder(), hashes)
        self._hashes = shelve.open(hashes, writeback=True)

        self._username = username
        self._login(username, password)
开发者ID:B-Rich,项目名称:Lost-Photos-Found,代码行数:31,代码来源:server.py


示例11: fpstore

def fpstore(servername,fullhash,headerhashes,fullfn='staticfull',headersfn='staticheaders'):
    import shelve,logging
    log = logging.getLogger("fpstore")
    fulldb = shelve.open(fullfn)
    headersdb = shelve.open(headersfn)    
    if fulldb.has_key(fullhash):
        log.debug("fulldb has this key already defined")
        if servername not in fulldb[fullhash]:
            log.debug("server not already in therefore appending")
            fulldb[fullhash].append(servername)
        else:
            log.debug("server known therefore not appending")
    else:
        log.debug("key not defined therefore creating")
        fulldb[fullhash] = [servername]
    for headerhash in headerhashes:
        if headersdb.has_key(headerhash):
            if servername not in headersdb[headerhash]:
                headersdb[headerhash].append(servername)
        else:
            headersdb[headerhash] = [servername]
    fulldb.sync()
    fulldb.close()
    headersdb.sync()
    headersdb.close()
开发者ID:1EDTHEMAN1,项目名称:raspberry_pwn,代码行数:25,代码来源:fphelper.py


示例12: extract_saturation

def extract_saturation(input_db_name, output_db_name):

	input_db = shelve.open(input_db_name)
	output_db = shelve.open(output_db_name, 'c')

	for key in input_db:
		try:
			url = input_db[key]['url']
			req = urllib.urlopen(url)
			arr = np.asarray(bytearray(req.read()), dtype=np.uint8)
			im 	= cv2.imdecode(arr, -1)
				
			width, height, depth = im.shape
			imsize = width * height

			hsv_im 	= cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
			sat 	= cv2.split(hsv_im)[1]

			val = float(np.sum(sat)) / float(imsize)

			output_db[key] = {'index':input_db[key]['index'], 'url':input_db[key]['url'], 'saturation':val}

		except:
			pass

	input_db.close()
	output_db.close()
开发者ID:AGhiuta,项目名称:image-clustering,代码行数:27,代码来源:saturation.py


示例13: __init__

	def __init__(self, parent):
		Frame.__init__(self, parent)
		self.scrolltext = ScrolledText(self, width = 120,
				font = ("", 14, "normal"), height = 30)
		self.scrolltext.config(state = "disable")

		f = Frame(self)
		self.entry = Entry(f, width = 75, font = ("", 15, "normal"))
#		self.entry.grab_set()
		self.entry.focus_set()
		self.entry.grid(row = 0, column = 0)
		self.entry.bind("<Return>", lambda event, frame = self: frame._runCommand())

		self.button1 = Button(f, text = "sumbmit", font = ("", 15, "normal"),
						command = lambda frame = self: frame._runCommand())
		self.button1.grid(row = 0, column = 1, padx = 4, pady = 2)
		self.button2 = Button(f, text = "clear", font = ("", 15, "normal"),
						command = lambda frame = self: frame._deleteText())
		self.button2.grid(row = 0, column = 2, padx = 4, pady = 2)
		f.grid(row = 1, column = 0)
		self.scrolltext.grid(row = 0, column = 0)

		with shelve.open("userconfigdb") as db:
			keys = tuple(db.keys())
		if not keys:
			configButtonCmd()
		with shelve.open("userconfigdb") as db:
			self.sshIP = db[tuple(db.keys())[0]].hostaddress
		self._configButtonCmd()
开发者ID:wearyoung,项目名称:daovision,代码行数:29,代码来源:gui.py


示例14: get_matlab_versions

def get_matlab_versions(overwrite=False):
    '''Get MATLAB versions from Wikipedia.

    Args:
        overwrite (bool) : Overwrite existing data
    Returns:
        Dictionary of MATLAB versions
        
    '''
    
    # Get version file
    version_file = '%s/matlab-versions.shelf' % (trendpath.data_dir)

    # Used saved versions if version file exists and not overwrite
    if os.path.exists(version_file) and not overwrite:
        shelf = shelve.open(version_file)
        versions = shelf['versions']
        shelf.close()
        return versions

    # Open Wikipedia page
    req = requests.get('http://en.wikipedia.org/wiki/MATLAB')
    soup = BS(req.text)

    # Find Release History table
    histtxt = soup.find(text=re.compile('release history', re.I))
    histspn = histtxt.findParent('span')
    histtab = histspn.findNext('table', {'class' : 'wikitable'})
    histrow = histtab.findAll('tr')
    
    # Initialize Matlab versions
    versions = {}

    for row in histrow[1:]:
        
        # Get <td> elements
        tds = row.findAll('td')

        # Get version number
        vernum = tds[0].text
        vernum = re.sub('matlab\s+', '', vernum, flags=re.I)

        # Get version name
        vernam = tds[1].text
        vernam = re.sub('r', 'r?', vernam, flags=re.I)
        vernam = re.sub('sp', '%s(?:sp|service pack)%s' % delimrep(2), \
            vernam, flags=re.I)

        # Add to versions
        versions[vernum] = [vernum]
        if vernam:
            versions[vernum].append(vernam)
    
    # Save results to version file
    shelf = shelve.open(version_file)
    shelf['versions'] = versions
    shelf.close()

    # Return versions
    return versions
开发者ID:dengemann,项目名称:neurotrends,代码行数:60,代码来源:lang.py


示例15: plot_hist_events

def plot_hist_events(bin_size):
    f = shelve.open("%s_%s.dat" % (fupbin, bin_size))
    t = shelve.open("%s_%s.dat" % (tupbin, bin_size))
    fevents = []  # a list of events for every flcikr user per bin
    tevents = []  # a list of events for every twitter user per bin

    def add(x, y):
        return x + y

    map(lambda uname: fevents.append(reduce(add, map(lambda y: y[1], f[uname]))), f.keys())
    map(lambda uname: tevents.append(reduce(add, map(lambda y: y[1], t[uname]))), t.keys())
    fig1 = plt.figure(figsize=(10, 14))
    fevents = np.array(fevents)
    tevents = np.array(tevents)
    ax1 = fig1.add_subplot(211)
    ax1.hist(fevents, np.unique(fevents), cumulative=-1)
    ax1.set_xscale("log")
    ax1.set_yscale("log")
    ax1.set_xlabel("# of flickr events {%s hr bins}" % (bin_size))
    ax1.set_ylabel("# of users {%s hr bins}" % (bin_size))
    ax1.set_title("flickr events frequencies {%s hr bins}" % (bin_size))
    ax2 = fig1.add_subplot(212)
    ax2.hist(tevents, np.unique(tevents), cumulative=-1)
    ax2.set_xscale("log")
    ax2.set_yscale("log")
    ax2.set_xlabel("# of twitter events {%s hr bins}" % (bin_size))
    ax2.set_ylabel("# of users {%s hr bins}" % (bin_size))
    ax2.set_title("Twitter events frequencies {%s hr bins}" % (bin_size))
    fig1.savefig("%s/flick-events-hist_%s.pdf" % (fig_save, bin_size))
    f.close()
    t.close()
开发者ID:knkumar,项目名称:tweet_analysis,代码行数:31,代码来源:plothist.py


示例16: add_job

    def add_job(self, command, hour, minute, sec=0):

        logger.info("2. scheduler adding job command: %s at %s:%s:%s" % (
            command, hour, minute, sec
        ))
        sched = Scheduler(standalone=True)

        #make a db file
        shelve.open(
            os.path.join(
                os.path.dirname(__file__),
                'example.db'
            )
        )
        sched.add_jobstore(ShelveJobStore('example.db'), 'shelve')

        exec_time = datetime(
            date.today().year,
            date.today().month,
            date.today().day,
            int(hour),
            int(minute),
            int(sec)
        )
        #test
        #exec_time = datetime.now() + timedelta(seconds=5)

        sched.add_date_job(
            job,
            exec_time,
            name='alarm',
            jobstore='shelve',
            args=[command]
        )
        sched.start()
开发者ID:vsilent,项目名称:smarty-bot,代码行数:35,代码来源:at.py


示例17: __init__

    def __init__(self, func, options):
        self.func = func
        self.options = options
        if 'input' not in self.options:
            self.options['input'] = []
        self.options['input'] = _list(self.options['input'])

        filename = inspect.getfile(sys._getframe(2))
        self.working_dir = os.path.dirname(os.path.realpath(filename))

        # The database of remembered results
        memory_dir = self.memory_dir()
        db_file = '{}/{}'.format(memory_dir, self.func.func_name)
        self.results_db = shelve.open(db_file, writeback=True)
        flag_file = '{}/{}.flag'.format(memory_dir, self.func.func_name)

        if self.should_run(flag_file):
            # Clear the database of existing results since these must be out of date
            if os.path.exists(flag_file):
                os.unlink(flag_file)

            self.results_db.clear()
            open(flag_file, 'w').close() # touch the flag

        # Remember the byte code for the function so it gets run again if it changes
        current_code_string = marshal.dumps(func.func_code)
        if '-code-' not in self.results_db or current_code_string != self.results_db['-code-']:
            # nuke the old and create a new ... the old base based on old code
            if os.path.exists(flag_file):
                os.unlink(flag_file)
            self.results_db.clear()
            self.results_db = shelve.open(db_file)
            self.results_db['-code-'] = current_code_string
            open(flag_file, 'w').close() # touch the flag
开发者ID:birc-aeh,项目名称:gwf,代码行数:34,代码来源:__init__.py


示例18: GetTransitionModelFromShelf

 def GetTransitionModelFromShelf(self,yy,mm,dd,numDays,posNoise=None,currentNoise=None,shelfDirectory='.'):
     """ Loads up Transition models from the shelf for a given number of days, starting from a particular day, and a given amount of noise in position and/or a given amount of noise in the current predictions. We assume these models have been created earlier using ProduceTransitionModels.
         
         Args:
             * yy (int): year
             * mm (int): month
             * dd (int): day
             * numDays (int): number of days the model is being built over
             * posNoise (float): Amount of std-deviation of the random noise used in picking the start location
             * currentNoise (float): Amount of prediction noise in the ocean model
             * shelfDirectory (str): Directory in which the Transition models are located.
         
         Updates:
             * self.gm.FinalLocs: Stores the final locations 
     """
     self.posNoise = posNoise; 
     self.currentNoise = currentNoise 
     #import pdb; pdb.set_trace()
     if posNoise==None and currentNoise==None:
         gmShelf = shelve.open('%s/gliderModel_%04d%02d%02d_%d.shelf'%(shelfDirectory,yy,mm,dd,numDays), writeback=False )
     if posNoise!=None:
         if currentNoise!=None:
             gmShelf = shelve.open('%s/gliderModel_%04d%02d%02d_%d_%.3f_RN_%.3f.shelf'%(shelfDirectory,yy,mm,dd,numDays,posNoise,currentNoise),writeback=False)
         else:
             gmShelf = shelve.open('%s/gliderModel_%04d%02d%02d_%d_%.3f.shelf'%(shelfDirectory,yy,mm,dd,numDays,posNoise), writeback=False)
     if posNoise==None and currentNoise!=None:
         gmShelf=shelve.open('%s/gliderModel_%04d%02d%02d_%d_RN_%.3f.shelf'%(shelfDirectory,yy,mm,dd,numDays,currentNoise), writeback=False)     
     self.gm.TransModel = gmShelf['TransModel']
     #if gmShelf.has_key('FinalLocs'):
     self.gm.FinalLocs = gmShelf['FinalLocs']
     #if gmShelf.has_key('TracksInModel'):
     self.gm.TracksInModel = gmShelf['TracksInModel']
     gmShelf.close()
     # Now that we have loaded the new transition model, we better update our graph.
     self.ReInitializeMDP()
开发者ID:amp-at-clover,项目名称:gpplib,代码行数:35,代码来源:StateActionMDP.py


示例19: _buildIndexCacheFile

    def _buildIndexCacheFile(self):
	import shelve
	import os
	print "Building %s:" % (self.shelfname,),
	tempname = self.shelfname + ".temp"
	try:
	    indexCache = shelve.open(tempname)
	    self.rewind()
	    count = 0
	    while 1:
		offset, line = self.file.tell(), self.file.readline()
		if not line: break
		key = line[:string.find(line, ' ')]
		if (count % 1000) == 0:
		    print "%s..." % (key,),
		    import sys
		    sys.stdout.flush()
		indexCache[key] = line
		count = count + 1
	    indexCache.close()
	    os.rename(tempname, self.shelfname)
	finally:
	    try: os.remove(tempname)
	    except: pass
	print "done."
	self.indexCache = shelve.open(self.shelfname, 'r')
开发者ID:AdityaChaudhary,项目名称:NatI,代码行数:26,代码来源:wordnet.py


示例20: _cache

    def _cache(fn, *args, **kwargs):
        if cache.disabled:
            return fn(*args, **kwargs)

        # A bit obscure, but simplest way to generate unique key for
        # functions and methods in python 2 and 3:
        key = '{}.{}'.format(fn.__module__, repr(fn).split('at')[0])

        etag = '.'.join(_get_mtime(name) for name in depends_on)
        cache_path = _get_cache_path()

        try:
            with closing(shelve.open(cache_path)) as db:
                if db.get(key, {}).get('etag') == etag:
                    return db[key]['value']
                else:
                    value = fn(*args, **kwargs)
                    db[key] = {'etag': etag, 'value': value}
                    return value
        except dbm.error:
            # Caused when going from Python 2 to Python 3
            warn("Removing possibly out-dated cache")
            os.remove(cache_path)

            with closing(shelve.open(cache_path)) as db:
                value = fn(*args, **kwargs)
                db[key] = {'etag': etag, 'value': value}
                return value
开发者ID:sproutman,项目名称:thefuck,代码行数:28,代码来源:utils.py



注:本文中的shelve.open函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python data.DataPHA类代码示例发布时间:2022-05-27
下一篇:
Python shell_utils.run函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap