本文整理汇总了Python中shelve.open函数的典型用法代码示例。如果您正苦于以下问题:Python open函数的具体用法?Python open怎么用?Python open使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了open函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__( self, prefix, nruns=None ):
self.prefix = prefix
self.files = np.sort( np.array( glob.glob( prefix + "???/cmonkey_run.db" ) ) ) # get all cmonkey_run.db files
self.cms = shelve.open('egrin2_shelf.db', protocol=2, writeback=False)
keys = self.cms.keys()
for f in self.files:
index = os.path.dirname(f).replace(self.prefix,'')
if index in keys:
continue
try:
print f
b = cm2(f)
self.cms[ index ] = b
except:
print 'ERROR: %s' % (f)
e = sys.exc_info()[0]
print e
if nruns is not None and len(self.cms) >= nruns:
break
#self.tables = { tname: self.concat_tables(tname) for tname in self.cms['001'].tables.keys() }
self.tables = shelve.open('egrin2_tables_shelf.db', protocol=2, writeback=False)
keys = self.tables.keys()
for tname in self.cms['001'].tables.keys():
if tname in keys:
continue
tab = self.concat_tables(tname)
self.tables[str(tname)] = tab
开发者ID:scalefreegan,项目名称:egrin2-tools,代码行数:30,代码来源:egrin2obj.py
示例2: _open_cache
def _open_cache(cls):
cls.lock.acquire()
try:
cls.cache = shelve.open(cls.cache_filename)
except UnicodeError as e:
cls.log.exception("Warning: Failed to open "+cls.cache_filename+": "+unicode(e))
# This can happen with unicode characters in the path because the bdb module
# on win converts it internally to ascii, which fails
# The shelve module therefore does not support writing to paths containing non-ascii characters in general,
# which means we cannot store data persistently.
cls.cache = DummyShelve()
except Exception as e:
# 2 causes for this:
# Some python distributions used to use the bsddb module as underlying shelve.
# The bsddb module is now deprecated since 2.6 and is not present in some 2.7 distributions.
# Therefore, the line above will yield an ImportError if it has been executed with
# a python supporting bsddb and now is executed again with python with no support for it.
# Since this may also be caused by a different error, we just try again once.
# If there is an old database file that was created with a
# deprecated dbm library, opening it will fail with an obscure exception, so we delete it
# and simply retry.
cls.log.exception("Warning: You probably have an old cache file; deleting and retrying: "+unicode(e))
if os.path.exists(cls.cache_filename):
os.remove(cls.cache_filename)
cls.cache = shelve.open(cls.cache_filename)
cls.lock.release()
开发者ID:MasterofJOKers,项目名称:unknown-horizons,代码行数:28,代码来源:yamlcache.py
示例3: harvest_program_pages
def harvest_program_pages(program_ids, from_cache=True):
"""
Download the source of program pages efficiently.
:param program_ids: A collection of UQ Program IDs.
:return: A list of web page sources
"""
if from_cache:
with shelve.open('.cache/local_shelf') as db_read:
sources = db_read.get('program_pages')
if sources:
return sources
prog_url_template = 'https://www.uq.edu.au/study/program.html?acad_prog={}'
prog_urls = (prog_url_template.format(prog_id) for prog_id in program_ids)
print('About to download {} pages'.format(len(program_ids)))
with futures.ThreadPoolExecutor(max_workers=10) as executor:
page_futures = [executor.submit(requests.get, prog_url) for prog_url in prog_urls]
print('Downloading...')
# Wait to load all results
results = futures.wait(page_futures)
sources = [completed.result().content for completed in results.done]
print('Downloaded {} pages'.format(len(sources)))
with shelve.open('.cache/local_shelf') as db_write:
db_write['program_pages'] = sources
return sources
开发者ID:kamikai,项目名称:Cosc3000_Project1,代码行数:32,代码来源:programs.py
示例4: __init__
def __init__(self, indexname, truncate=None):
dict.__init__(self)
try:
if truncate:
# In python 1.52 and before, dumbdbm (under shelve)
# doesn't clear the old database.
files = [indexname + '.dir',
indexname + '.dat',
indexname + '.bak'
]
for file in files:
if os.path.exists(file):
os.unlink(file)
raise Exception("open a new shelf")
self.data = shelve.open(indexname, flag='r')
except:
# No database exists.
self.data = shelve.open(indexname, flag='n')
self.data[self.__version_key] = self.__version
else:
# Check to make sure the database is the correct version.
version = self.data.get(self.__version_key, None)
if version is None:
raise IOError("Unrecognized index format")
elif version != self.__version:
raise IOError("Version %s doesn't match my version %s"
% (version, self.__version))
开发者ID:DavidCain,项目名称:biopython,代码行数:27,代码来源:Index.py
示例5: __init__
def __init__(self, rememberer_name, config, saml_client, wayf, cache,
sid_store=None, discovery="", idp_query_param="",
sid_store_cert=None,):
self.rememberer_name = rememberer_name
self.wayf = wayf
self.saml_client = saml_client
self.conf = config
self.cache = cache
self.discosrv = discovery
self.idp_query_param = idp_query_param
self.logout_endpoints = [urlparse(ep)[2] for ep in config.endpoint(
"single_logout_service")]
try:
self.metadata = self.conf.metadata
except KeyError:
self.metadata = None
if sid_store:
self.outstanding_queries = shelve.open(sid_store, writeback=True)
else:
self.outstanding_queries = {}
if sid_store_cert:
self.outstanding_certs = shelve.open(sid_store_cert, writeback=True)
else:
self.outstanding_certs = {}
self.iam = platform.node()
开发者ID:5monkeys,项目名称:pysaml2,代码行数:26,代码来源:sp.py
示例6: _cache
def _cache(fn, *args, **kwargs):
if cache.disabled:
return fn(*args, **kwargs)
# A bit obscure, but simplest way to generate unique key for
# functions and methods in python 2 and 3:
key = "{}.{}".format(fn.__module__, repr(fn).split("at")[0])
etag = ".".join(_get_mtime(name) for name in depends_on)
cache_path = _get_cache_path()
try:
with closing(shelve.open(cache_path)) as db:
if db.get(key, {}).get("etag") == etag:
return db[key]["value"]
else:
value = fn(*args, **kwargs)
db[key] = {"etag": etag, "value": value}
return value
except shelve_open_errors:
# Caused when going from Python 2 to Python 3 and vice-versa
warn("Removing possibly out-dated cache")
os.remove(cache_path)
with closing(shelve.open(cache_path)) as db:
value = fn(*args, **kwargs)
db[key] = {"etag": etag, "value": value}
return value
开发者ID:IINT3LIIG3NCII,项目名称:thefuck,代码行数:28,代码来源:utils.py
示例7: _cache
def _cache(fn, *args, **kwargs):
if cache.disabled:
return fn(*args, **kwargs)
# A bit obscure, but simplest way to generate unique key for
# functions and methods in python 2 and 3:
key = '{}.{}'.format(fn.__module__, repr(fn).split('at')[0])
etag = '.'.join(_get_mtime(name) for name in depends_on)
cache_dir = get_cache_dir()
cache_path = Path(cache_dir).joinpath('thefuck').as_posix()
try:
with closing(shelve.open(cache_path)) as db:
if db.get(key, {}).get('etag') == etag:
return db[key]['value']
else:
value = fn(*args, **kwargs)
db[key] = {'etag': etag, 'value': value}
return value
except (shelve_open_error, ImportError):
# Caused when switching between Python versions
warn("Removing possibly out-dated cache")
os.remove(cache_path)
with closing(shelve.open(cache_path)) as db:
value = fn(*args, **kwargs)
db[key] = {'etag': etag, 'value': value}
return value
开发者ID:Googulator,项目名称:thefuck,代码行数:29,代码来源:utils.py
示例8: download
def download(self, url):
cached = False
refresh = False
cache = shelve.open(os.path.join(os.environ["pydna_data_dir"], "web"), protocol=cPickle.HIGHEST_PROTOCOL, writeback=False)
key = str(url)
if os.environ["pydna_cache"] in ("compare", "cached"):
try:
cached = cache[key]
except KeyError:
if os.environ["pydna_cache"] == "compare":
raise Exception("no result for this key!")
else:
refresh = True
if refresh or os.environ["pydna_cache"] in ("compare", "refresh", "nocache"):
response = urllib2.urlopen(url)
result = response.read()
if os.environ["pydna_cache"] == "compare":
if result!=cached:
module_logger.warning('download error')
if refresh or os.environ["pydna_cache"] == "refresh":
cache = shelve.open(os.path.join(os.environ["pydna_data_dir"],"genbank"), protocol=cPickle.HIGHEST_PROTOCOL, writeback=False)
cache[key] = result
elif cached and os.environ["pydna_cache"] not in ("nocache", "refresh"):
result = cached
cache.close()
return result
开发者ID:souravsingh,项目名称:pydna,代码行数:33,代码来源:download.py
示例9: get_packet
def get_packet(self, session_id: int=0, *struct_labels):
# type checking
if not isinstance(session_id, int):
raise TypeError("session_id must be an int and not %s" % str(session_id))
if len(struct_labels) < 1:
raise ValueError("struct_labels must specify the path to a leaf.")
# open log_index_file as read-only
log_index_file = shelve.open(self.log_path, flag='r', protocol=3, writeback=False)
# find the desired session dir
if session_id > 0:
if session_id > log_index_file[self.idx_num_session_key]:
raise ValueError('session_id must be <= %d' % log_index_file[self.idx_num_session_key])
session_dir = log_index_file[str(session_id)]
else:
curr_session = int(log_index_file[self.idx_num_session_key])
if curr_session + session_id < 1:
raise ValueError('Current session is only %d' % curr_session)
session_dir = log_index_file[str(curr_session + session_id)]
session_path = os.path.join(os.path.dirname(self.log_path), session_dir, session_dir)
session_shelf = shelve.open(session_path, flag='r', protocol=3, writeback=False)
return session_shelf[self.encode_struct(*struct_labels)]
开发者ID:rbgorbet,项目名称:Hylozoic-Series-3,代码行数:28,代码来源:data_logger.py
示例10: __init__
def __init__(self, host, username, password, debug=False):
"""
Server class __init__ which expects an IMAP host to connect to
@param host: gmail's default server is fine: imap.gmail.com
@param username: your gmail account (i.e. [email protected])
@param password: we highly recommend you to use 2-factor auth here
"""
if not host:
raise Exception('Missing IMAP host parameter in your config')
try:
self._server = IMAPClient(host, use_uid=True, ssl=True)
except:
raise Exception('Could not successfully connect to the IMAP host')
setattr(self._server, 'debug', debug)
# mails index to avoid unnecessary redownloading
index = '.index_%s' % (username)
index = os.path.join(_app_folder(), index)
self._index = shelve.open(index, writeback=True)
# list of attachments hashes to avoid dupes
hashes = '.hashes_%s' % (username)
hashes = os.path.join(_app_folder(), hashes)
self._hashes = shelve.open(hashes, writeback=True)
self._username = username
self._login(username, password)
开发者ID:B-Rich,项目名称:Lost-Photos-Found,代码行数:31,代码来源:server.py
示例11: fpstore
def fpstore(servername,fullhash,headerhashes,fullfn='staticfull',headersfn='staticheaders'):
import shelve,logging
log = logging.getLogger("fpstore")
fulldb = shelve.open(fullfn)
headersdb = shelve.open(headersfn)
if fulldb.has_key(fullhash):
log.debug("fulldb has this key already defined")
if servername not in fulldb[fullhash]:
log.debug("server not already in therefore appending")
fulldb[fullhash].append(servername)
else:
log.debug("server known therefore not appending")
else:
log.debug("key not defined therefore creating")
fulldb[fullhash] = [servername]
for headerhash in headerhashes:
if headersdb.has_key(headerhash):
if servername not in headersdb[headerhash]:
headersdb[headerhash].append(servername)
else:
headersdb[headerhash] = [servername]
fulldb.sync()
fulldb.close()
headersdb.sync()
headersdb.close()
开发者ID:1EDTHEMAN1,项目名称:raspberry_pwn,代码行数:25,代码来源:fphelper.py
示例12: extract_saturation
def extract_saturation(input_db_name, output_db_name):
input_db = shelve.open(input_db_name)
output_db = shelve.open(output_db_name, 'c')
for key in input_db:
try:
url = input_db[key]['url']
req = urllib.urlopen(url)
arr = np.asarray(bytearray(req.read()), dtype=np.uint8)
im = cv2.imdecode(arr, -1)
width, height, depth = im.shape
imsize = width * height
hsv_im = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
sat = cv2.split(hsv_im)[1]
val = float(np.sum(sat)) / float(imsize)
output_db[key] = {'index':input_db[key]['index'], 'url':input_db[key]['url'], 'saturation':val}
except:
pass
input_db.close()
output_db.close()
开发者ID:AGhiuta,项目名称:image-clustering,代码行数:27,代码来源:saturation.py
示例13: __init__
def __init__(self, parent):
Frame.__init__(self, parent)
self.scrolltext = ScrolledText(self, width = 120,
font = ("", 14, "normal"), height = 30)
self.scrolltext.config(state = "disable")
f = Frame(self)
self.entry = Entry(f, width = 75, font = ("", 15, "normal"))
# self.entry.grab_set()
self.entry.focus_set()
self.entry.grid(row = 0, column = 0)
self.entry.bind("<Return>", lambda event, frame = self: frame._runCommand())
self.button1 = Button(f, text = "sumbmit", font = ("", 15, "normal"),
command = lambda frame = self: frame._runCommand())
self.button1.grid(row = 0, column = 1, padx = 4, pady = 2)
self.button2 = Button(f, text = "clear", font = ("", 15, "normal"),
command = lambda frame = self: frame._deleteText())
self.button2.grid(row = 0, column = 2, padx = 4, pady = 2)
f.grid(row = 1, column = 0)
self.scrolltext.grid(row = 0, column = 0)
with shelve.open("userconfigdb") as db:
keys = tuple(db.keys())
if not keys:
configButtonCmd()
with shelve.open("userconfigdb") as db:
self.sshIP = db[tuple(db.keys())[0]].hostaddress
self._configButtonCmd()
开发者ID:wearyoung,项目名称:daovision,代码行数:29,代码来源:gui.py
示例14: get_matlab_versions
def get_matlab_versions(overwrite=False):
'''Get MATLAB versions from Wikipedia.
Args:
overwrite (bool) : Overwrite existing data
Returns:
Dictionary of MATLAB versions
'''
# Get version file
version_file = '%s/matlab-versions.shelf' % (trendpath.data_dir)
# Used saved versions if version file exists and not overwrite
if os.path.exists(version_file) and not overwrite:
shelf = shelve.open(version_file)
versions = shelf['versions']
shelf.close()
return versions
# Open Wikipedia page
req = requests.get('http://en.wikipedia.org/wiki/MATLAB')
soup = BS(req.text)
# Find Release History table
histtxt = soup.find(text=re.compile('release history', re.I))
histspn = histtxt.findParent('span')
histtab = histspn.findNext('table', {'class' : 'wikitable'})
histrow = histtab.findAll('tr')
# Initialize Matlab versions
versions = {}
for row in histrow[1:]:
# Get <td> elements
tds = row.findAll('td')
# Get version number
vernum = tds[0].text
vernum = re.sub('matlab\s+', '', vernum, flags=re.I)
# Get version name
vernam = tds[1].text
vernam = re.sub('r', 'r?', vernam, flags=re.I)
vernam = re.sub('sp', '%s(?:sp|service pack)%s' % delimrep(2), \
vernam, flags=re.I)
# Add to versions
versions[vernum] = [vernum]
if vernam:
versions[vernum].append(vernam)
# Save results to version file
shelf = shelve.open(version_file)
shelf['versions'] = versions
shelf.close()
# Return versions
return versions
开发者ID:dengemann,项目名称:neurotrends,代码行数:60,代码来源:lang.py
示例15: plot_hist_events
def plot_hist_events(bin_size):
f = shelve.open("%s_%s.dat" % (fupbin, bin_size))
t = shelve.open("%s_%s.dat" % (tupbin, bin_size))
fevents = [] # a list of events for every flcikr user per bin
tevents = [] # a list of events for every twitter user per bin
def add(x, y):
return x + y
map(lambda uname: fevents.append(reduce(add, map(lambda y: y[1], f[uname]))), f.keys())
map(lambda uname: tevents.append(reduce(add, map(lambda y: y[1], t[uname]))), t.keys())
fig1 = plt.figure(figsize=(10, 14))
fevents = np.array(fevents)
tevents = np.array(tevents)
ax1 = fig1.add_subplot(211)
ax1.hist(fevents, np.unique(fevents), cumulative=-1)
ax1.set_xscale("log")
ax1.set_yscale("log")
ax1.set_xlabel("# of flickr events {%s hr bins}" % (bin_size))
ax1.set_ylabel("# of users {%s hr bins}" % (bin_size))
ax1.set_title("flickr events frequencies {%s hr bins}" % (bin_size))
ax2 = fig1.add_subplot(212)
ax2.hist(tevents, np.unique(tevents), cumulative=-1)
ax2.set_xscale("log")
ax2.set_yscale("log")
ax2.set_xlabel("# of twitter events {%s hr bins}" % (bin_size))
ax2.set_ylabel("# of users {%s hr bins}" % (bin_size))
ax2.set_title("Twitter events frequencies {%s hr bins}" % (bin_size))
fig1.savefig("%s/flick-events-hist_%s.pdf" % (fig_save, bin_size))
f.close()
t.close()
开发者ID:knkumar,项目名称:tweet_analysis,代码行数:31,代码来源:plothist.py
示例16: add_job
def add_job(self, command, hour, minute, sec=0):
logger.info("2. scheduler adding job command: %s at %s:%s:%s" % (
command, hour, minute, sec
))
sched = Scheduler(standalone=True)
#make a db file
shelve.open(
os.path.join(
os.path.dirname(__file__),
'example.db'
)
)
sched.add_jobstore(ShelveJobStore('example.db'), 'shelve')
exec_time = datetime(
date.today().year,
date.today().month,
date.today().day,
int(hour),
int(minute),
int(sec)
)
#test
#exec_time = datetime.now() + timedelta(seconds=5)
sched.add_date_job(
job,
exec_time,
name='alarm',
jobstore='shelve',
args=[command]
)
sched.start()
开发者ID:vsilent,项目名称:smarty-bot,代码行数:35,代码来源:at.py
示例17: __init__
def __init__(self, func, options):
self.func = func
self.options = options
if 'input' not in self.options:
self.options['input'] = []
self.options['input'] = _list(self.options['input'])
filename = inspect.getfile(sys._getframe(2))
self.working_dir = os.path.dirname(os.path.realpath(filename))
# The database of remembered results
memory_dir = self.memory_dir()
db_file = '{}/{}'.format(memory_dir, self.func.func_name)
self.results_db = shelve.open(db_file, writeback=True)
flag_file = '{}/{}.flag'.format(memory_dir, self.func.func_name)
if self.should_run(flag_file):
# Clear the database of existing results since these must be out of date
if os.path.exists(flag_file):
os.unlink(flag_file)
self.results_db.clear()
open(flag_file, 'w').close() # touch the flag
# Remember the byte code for the function so it gets run again if it changes
current_code_string = marshal.dumps(func.func_code)
if '-code-' not in self.results_db or current_code_string != self.results_db['-code-']:
# nuke the old and create a new ... the old base based on old code
if os.path.exists(flag_file):
os.unlink(flag_file)
self.results_db.clear()
self.results_db = shelve.open(db_file)
self.results_db['-code-'] = current_code_string
open(flag_file, 'w').close() # touch the flag
开发者ID:birc-aeh,项目名称:gwf,代码行数:34,代码来源:__init__.py
示例18: GetTransitionModelFromShelf
def GetTransitionModelFromShelf(self,yy,mm,dd,numDays,posNoise=None,currentNoise=None,shelfDirectory='.'):
""" Loads up Transition models from the shelf for a given number of days, starting from a particular day, and a given amount of noise in position and/or a given amount of noise in the current predictions. We assume these models have been created earlier using ProduceTransitionModels.
Args:
* yy (int): year
* mm (int): month
* dd (int): day
* numDays (int): number of days the model is being built over
* posNoise (float): Amount of std-deviation of the random noise used in picking the start location
* currentNoise (float): Amount of prediction noise in the ocean model
* shelfDirectory (str): Directory in which the Transition models are located.
Updates:
* self.gm.FinalLocs: Stores the final locations
"""
self.posNoise = posNoise;
self.currentNoise = currentNoise
#import pdb; pdb.set_trace()
if posNoise==None and currentNoise==None:
gmShelf = shelve.open('%s/gliderModel_%04d%02d%02d_%d.shelf'%(shelfDirectory,yy,mm,dd,numDays), writeback=False )
if posNoise!=None:
if currentNoise!=None:
gmShelf = shelve.open('%s/gliderModel_%04d%02d%02d_%d_%.3f_RN_%.3f.shelf'%(shelfDirectory,yy,mm,dd,numDays,posNoise,currentNoise),writeback=False)
else:
gmShelf = shelve.open('%s/gliderModel_%04d%02d%02d_%d_%.3f.shelf'%(shelfDirectory,yy,mm,dd,numDays,posNoise), writeback=False)
if posNoise==None and currentNoise!=None:
gmShelf=shelve.open('%s/gliderModel_%04d%02d%02d_%d_RN_%.3f.shelf'%(shelfDirectory,yy,mm,dd,numDays,currentNoise), writeback=False)
self.gm.TransModel = gmShelf['TransModel']
#if gmShelf.has_key('FinalLocs'):
self.gm.FinalLocs = gmShelf['FinalLocs']
#if gmShelf.has_key('TracksInModel'):
self.gm.TracksInModel = gmShelf['TracksInModel']
gmShelf.close()
# Now that we have loaded the new transition model, we better update our graph.
self.ReInitializeMDP()
开发者ID:amp-at-clover,项目名称:gpplib,代码行数:35,代码来源:StateActionMDP.py
示例19: _buildIndexCacheFile
def _buildIndexCacheFile(self):
import shelve
import os
print "Building %s:" % (self.shelfname,),
tempname = self.shelfname + ".temp"
try:
indexCache = shelve.open(tempname)
self.rewind()
count = 0
while 1:
offset, line = self.file.tell(), self.file.readline()
if not line: break
key = line[:string.find(line, ' ')]
if (count % 1000) == 0:
print "%s..." % (key,),
import sys
sys.stdout.flush()
indexCache[key] = line
count = count + 1
indexCache.close()
os.rename(tempname, self.shelfname)
finally:
try: os.remove(tempname)
except: pass
print "done."
self.indexCache = shelve.open(self.shelfname, 'r')
开发者ID:AdityaChaudhary,项目名称:NatI,代码行数:26,代码来源:wordnet.py
示例20: _cache
def _cache(fn, *args, **kwargs):
if cache.disabled:
return fn(*args, **kwargs)
# A bit obscure, but simplest way to generate unique key for
# functions and methods in python 2 and 3:
key = '{}.{}'.format(fn.__module__, repr(fn).split('at')[0])
etag = '.'.join(_get_mtime(name) for name in depends_on)
cache_path = _get_cache_path()
try:
with closing(shelve.open(cache_path)) as db:
if db.get(key, {}).get('etag') == etag:
return db[key]['value']
else:
value = fn(*args, **kwargs)
db[key] = {'etag': etag, 'value': value}
return value
except dbm.error:
# Caused when going from Python 2 to Python 3
warn("Removing possibly out-dated cache")
os.remove(cache_path)
with closing(shelve.open(cache_path)) as db:
value = fn(*args, **kwargs)
db[key] = {'etag': etag, 'value': value}
return value
开发者ID:sproutman,项目名称:thefuck,代码行数:28,代码来源:utils.py
注:本文中的shelve.open函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论