• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python cache.FileSystemCache类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中werkzeug.contrib.cache.FileSystemCache的典型用法代码示例。如果您正苦于以下问题:Python FileSystemCache类的具体用法?Python FileSystemCache怎么用?Python FileSystemCache使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了FileSystemCache类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: FileSystemSessionInterface

class FileSystemSessionInterface(SessionInterface):
    """Uses the :class:`werkzeug.contrib.cache.FileSystemCache` as a session
    backend.

    :param cache_dir: the directory where session files are stored.
    :param threshold: the maximum number of items the session stores before it
                      starts deleting some.
    :param mode: the file mode wanted for the session files, default 0600
    :param key_prefix: A prefix that is added to FileSystemCache store keys.
    """

    session_class = FileSystemSession

    def __init__(self, cache_dir, threshold, mode, key_prefix):
        from werkzeug.contrib.cache import FileSystemCache
        self.cache = FileSystemCache(cache_dir, threshold=threshold, mode=mode)
        self.key_prefix = key_prefix

    def _generate_sid(self):
        return str(uuid4())

    def open_session(self, app, request):
        sid = request.cookies.get(app.session_cookie_name)
        if not sid:
            sid = self._generate_sid()
            return self.session_class(sid=sid)
        data = self.cache.get(self.key_prefix + sid)
        if data is not None:
            return self.session_class(data, sid=sid)
        return self.session_class(sid=sid)
    
    def save_session(self, app, session, response):
        domain = self.get_cookie_domain(app)
        path = self.get_cookie_path(app)
        if not session:
            if session.modified:
                self.cache.delete(self.key_prefix + session.sid)
                response.delete_cookie(app.session_cookie_name,
                                       domain=domain, path=path)
            return

        # Modification case.  There are upsides and downsides to
        # emitting a set-cookie header each request.  The behavior
        # is controlled by the :meth:`should_set_cookie` method
        # which performs a quick check to figure out if the cookie
        # should be set or not.  This is controlled by the
        # SESSION_REFRESH_EACH_REQUEST config flag as well as
        # the permanent flag on the session itself.
        # if not self.should_set_cookie(app, session):
        #    return

        httponly = self.get_cookie_httponly(app)
        secure = self.get_cookie_secure(app)
        expires = self.get_expiration_time(app, session)
        data = dict(session)
        self.cache.set(self.key_prefix + session.sid, data,
                         int(app.permanent_session_lifetime.total_seconds()))
        response.set_cookie(app.session_cookie_name, session.sid,
                            expires=expires, httponly=httponly,
                            domain=domain, path=path, secure=secure)
开发者ID:x20080406,项目名称:bteditor,代码行数:60,代码来源:sessions.py


示例2: number_gen

def number_gen(first_part, start_num, end_num, semester, dept, subjects):
    driver = webdriver.Firefox()  # Firefox used for testing. Change it to PhantomJS
    driver.implicitly_wait(30)
    base_url = "http://result.pondiuni.edu.in/candidate.asp"
    url = base_url
    driver.get(base_url)
    # os.mkdir(str(first_part))
    os.chdir("results")
    os.chdir(str(first_part))
    cache = FileSystemCache('.cachedir', threshold=100000)
    for number in range(start_num, end_num + 1):
        current_num = "%04d" % number
        numb = first_part + str(current_num)
        driver.find_element_by_id("txtregno").clear()
        driver.find_element_by_id("txtregno").send_keys(numb)
        Select(driver.find_element_by_id("cmbdegree")).select_by_visible_text(dept)
        Select(driver.find_element_by_id("cmbexamno")).select_by_visible_text(semester)
        driver.find_element_by_id("button1").click()

        # copying the content
        page_source = cache.get(url)
        page_source = driver.page_source
        cache.set(url, page_source, timeout=60 * 60 * 24 * 7)  # week in seconds
        root = html.document_fromstring(page_source)
        Cleaner(kill_tags=['noscript'], style=True)(root)  # lxml >= 2.3.1

        # pasting to file
        filename = str(numb) + ".txt"
        fp = open(filename, 'w')
        fp.write((root.text_content()).encode('utf-8'))
        fp.close()
        driver.back()
    driver.close()
    return analyze(subjects)
开发者ID:yoga30696,项目名称:Automation-of-result-analysis,代码行数:34,代码来源:server.py


示例3: check_cache

 def check_cache( self, bib_id ):
     """ Checks cache for marc. """
     cache = FileSystemCache( self.cache_dir, threshold=500, default_timeout=self.cache_hours, mode=0664 )  # http://werkzeug.pocoo.org/docs/0.9/contrib/cache/
     cache_key = bib_id
     marc = cache.get( cache_key )
     if marc == None:
         self.log.debug( u'in app_helper.Helper.check_cache(); marc not found in cache' )
     else:
         self.log.debug( u'in app_helper.Helper.check_cache(); marc found in cache' )
     return ( marc, cache )
开发者ID:birkin,项目名称:addto_refworks,代码行数:10,代码来源:app_helper.py


示例4: setUp

    def setUp(self):
        """BaseModel test set up"""

        if os.path.isfile("/tmp/box.db"):
            os.unlink("/tmp/box.db")
        DBHelper().set_db("/tmp/box.db")
        InstallHelper.reset()
        cache = FileSystemCache("/tmp/werkzeug")
        cache.clear()
        BaseModel.set_cache(cache)
        SampleModel.install()
开发者ID:henkelund,项目名称:memobox,代码行数:11,代码来源:test_base.py


示例5: test_filesystemcache_prune

def test_filesystemcache_prune():
    """
    test if FileSystemCache._prune works and keeps the cache entry count
    below the given threshold.
    """
    THRESHOLD = 13
    tmp_dir = tempfile.mkdtemp()
    cache = FileSystemCache(cache_dir=tmp_dir, threshold=THRESHOLD)
    for i in range(2 * THRESHOLD):
        cache.set(str(i), i)
    cache_files = os.listdir(tmp_dir)
    shutil.rmtree(tmp_dir)
    assert len(cache_files) <= THRESHOLD
开发者ID:EnTeQuAk,项目名称:werkzeug,代码行数:13,代码来源:test_cache.py


示例6: test_filesystemcache_clear

def test_filesystemcache_clear():
    """
    test if FileSystemCache.clear works
    """
    tmp_dir = tempfile.mkdtemp()
    cache = FileSystemCache(cache_dir=tmp_dir)
    cache.set("foo", "bar")
    cache_files = os.listdir(tmp_dir)
    assert len(cache_files) == 1
    cache.clear()
    cache_files = os.listdir(tmp_dir)
    assert len(cache_files) == 0
    shutil.rmtree(tmp_dir)
开发者ID:EnTeQuAk,项目名称:werkzeug,代码行数:13,代码来源:test_cache.py


示例7: test_filesystemcache_set_get

def test_filesystemcache_set_get():
    """
    test if FileSystemCache.set/get works
    """
    tmp_dir = tempfile.mkdtemp()
    try:
        cache = FileSystemCache(cache_dir=tmp_dir)
        for i in range(3):
            cache.set(str(i), i * i)
        for i in range(3):
            result = cache.get(str(i))
            assert result == i * i
    finally:
        shutil.rmtree(tmp_dir)
开发者ID:EnTeQuAk,项目名称:werkzeug,代码行数:14,代码来源:test_cache.py


示例8: BaseProvider

class BaseProvider(object):

    __metaclass__ = ABCMeta

    def __init__(self, cache_dir=None, default_timeout=60 * 60 * 24,
                 api_key=None):
        # store it in cache for 1 day. using file system cache because
        # memcached is too mainstream. :)
        self.cache = FileSystemCache(cache_dir=cache_dir or '/tmp/__arcoiro__',
                                     default_timeout=default_timeout)
        self._api_key = api_key

    @abstractproperty
    def name(self):
        pass

    @abstractproperty
    def url(self):
        pass

    @abstractmethod
    def get_urls_from_tag(self, tag):
        pass

    @property
    def display_name(self):
        return self.name

    @property
    def api_key(self):
        if self._api_key is not None:
            return self._api_key
        config_key = '%s_API_KEY' % self.name.upper()
        key = current_app.config.get(config_key)
        if key is None:
            raise RuntimeError('%s not defined!' % config_key)
        return key

    def get_cached_urls_from_tag(self, tag):
        cache_key = '%s:%s' % (self.name, tag)
        urls = self.cache.get(cache_key)
        if urls is not None:
            return urls
        urls = self.get_urls_from_tag(tag)
        if urls is None:
            return None
        self.cache.set(cache_key, urls)
        return urls
开发者ID:rafaelmartins,项目名称:arcoiro,代码行数:48,代码来源:__init__.py


示例9: __init__

 def __init__(self, cache_dir=None, default_timeout=60 * 60 * 24,
              api_key=None):
     # store it in cache for 1 day. using file system cache because
     # memcached is too mainstream. :)
     self.cache = FileSystemCache(cache_dir=cache_dir or '/tmp/__arcoiro__',
                                  default_timeout=default_timeout)
     self._api_key = api_key
开发者ID:rafaelmartins,项目名称:arcoiro,代码行数:7,代码来源:__init__.py


示例10: __init__

 def __init__(self, cache_dir, threshold, mode, key_prefix,
              use_signer=False, permanent=True):
     from werkzeug.contrib.cache import FileSystemCache
     self.cache = FileSystemCache(cache_dir, threshold=threshold, mode=mode)
     self.key_prefix = key_prefix
     self.use_signer = use_signer
     self.permanent = permanent
开发者ID:deeb230,项目名称:nzbhydra,代码行数:7,代码来源:sessions.py


示例11: parse

def parse(url):
    cache = FileSystemCache('.cachedir', threshold=100000)

    # get page
    page_source = cache.get(url)
    if page_source is None:
        # use firefox to get page with javascript generated content
        with closing(Firefox()) as browser:
            browser.get(url)
            page_source = browser.page_source
            cache.set(url, page_source, timeout=60*60*24*7) # week in seconds

    # extract text
    root = html.document_fromstring(page_source)
    # remove flash, images, <script>,<style>, etc
    Cleaner(kill_tags=['noscript'], style=True)(root) # lxml >= 2.3.1
    return root.text_content() # extract text
开发者ID:phektus,项目名称:HN-Summarizer,代码行数:17,代码来源:content_parser.py


示例12: WechatCache

class WechatCache(object):
    """基于文件的缓存

    """

    def __init__(self, cache_dir='cache', default_timeout=300):
        """初始化

        cache_dir是缓存目录
        """
        self.cache = FileSystemCache(cache_dir, default_timeout=default_timeout)

    def clear(self):
        """清空缓存
        """
        return self.cache.clear()

    def get(self, key):
        """获取缓存

        获取键值key的缓存值
        如果没有对应缓存,返回None
        """
        return self.cache.get(key)

    def add(self, key, value, timeout=None):
        """增加缓存

        如果键值key对应的缓存不存在,那么增加值value到键值key,过期时间timeout,默认300秒
        否则返回False(即不能覆盖设置缓存)
        """
        return self.cache.add(key, value, timeout)

    def set(self, key, value, timeout=None):
        """设置缓存

        设置键值key的缓存为value,过期时间300秒
        """
        return self.cache.set(key, value, timeout)

    def delete(self, key):
        """删除缓存

        删除键值key存储的缓存
        """
        return self.cache.delete(key)
开发者ID:dingshanliang,项目名称:WechatSogou,代码行数:46,代码来源:filecache.py


示例13: getSpaceInfo

def getSpaceInfo(spaceId):
    global spaceCache
    if spaceCache is None:
        mkdir_p(DURACLOUD_SPACE_CACHE_DIR)
        spaceCache = FileSystemCache(DURACLOUD_SPACE_CACHE_DIR, threshold=50, default_timeout=(24*3600), mode=384)

    # check spaceCache, otherwise fetch info from DuraCloud
    result = spaceCache.get(spaceId)
    if result is None:
        url = DURACLOUD_URL+ "/duradmin/download/contentItem"
        auth = HTTPBasicAuth(DURACLOUD_USERNAME, DURACLOUD_PASSWORD)
        payload = {'spaceId': spaceId, 'contentId': 'info.json'}
        try:
            response = requests.get(url, params=payload, auth=auth)
            result = response.json()
            spaceCache.set(spaceId, result)
        except RequestException as e:
            print e
            raise
    return result
开发者ID:gregjan,项目名称:durastream,代码行数:20,代码来源:durastream.py


示例14: __init__

    def __init__(self, app, cache_dir='/tmp/cache', lock_file=None):
        if not os.path.isdir(cache_dir):
            os.mkdir(cache_dir)
        self.app = app
        self.cache = FileSystemCache(cache_dir, default_timeout=600)
        self.t_local = threading.local()
        if lock_file is not None:
            lock_file = os.path.abspath(os.path.realpath(lock_file))
        else:
            lock_file = '/tmp/cache.lock'

        self.lock_file = open(lock_file, 'wb+')
        self.t_lock = threading.Lock()
开发者ID:huxt2014,项目名称:python-examples,代码行数:13,代码来源:middlewares.py


示例15: __init__

    def __init__(self, path, tolerance, expiration, max_values, run_tests=True,
                 max_file_size=0):
        """Constructor method.

        :param path: The path of the cache database file.
        :param tolerance: The tolerance, in seconds to which a TimeMap is
        considered young enough to be used as is.
        :param expiration: How long, in seconds, the cache entries are stored
        every get will be a CACHE MISS.
        :param max_values: The maximum number of TimeMaps stored in cache
        before some are deleted
        :param run_tests: (Optional) Tests the cache at initialization.
        :param max_file_size: (Optional) The maximum size (in Bytes) for a
        TimeMap cache value. When max_file_size=0, there is no limit to
        a cache value. When max_file_size=X > 0, the cache will not
        store TimeMap that require more than X Bytes on disk.
        """
        # Parameters Check
        if tolerance <= 0 or expiration <= 0 or max_values <= 0:
            raise CacheError("Cannot create cache: all parameters must be > 0")

        self.tolerance = relativedelta(seconds=tolerance)
        self.path = path.rstrip('/')
        self.max_file_size = max(max_file_size, 0)
        self.CHECK_SIZE = self.max_file_size > 0
        self.max_values = max_values
        self.backend = FileSystemCache(path,
                                       threshold=self.max_values,
                                       default_timeout=expiration)

        # Testing cache
        if run_tests:
            try:
                key = '1'
                val = 1
                self.backend.set(key, val)
                assert (not self.CHECK_SIZE) or self._check_size(key) > 0
                assert self.backend.get(key) == val
                os.remove(self.path + '/' + md5(key).hexdigest())
            except Exception as e:
                raise CacheError("Error testing cache: %s" % e)

        logging.debug(
            "Cache created. max_files = %d. Expiration = %d. "
            "max_file_size = %d" % (
                self.max_values, expiration, self.max_file_size))
开发者ID:machawk1,项目名称:timegate,代码行数:46,代码来源:cache.py


示例16: _set_cache

 def _set_cache(self):
     if self.app.config['TESTING']:
         self.cache = NullCache()
     else:
         if self.app.config['CACHE_TYPE'] == 'Null':
             self.cache = NullCache()
         elif self.app.config['CACHE_TYPE'] == 'Simple':
             self.cache = SimpleCache(
                 threshold=self.app.config['CACHE_THRESHOLD'],
                 default_timeout=self.app.config['CACHE_DEFAULT_TIMEOUT'])
         elif self.app.config['CACHE_TYPE'] == 'Memcached':
             self.cache = MemcachedCache(
                 self.app.config['CACHE_MEMCACHED_SERVERS'],
                 default_timeout=self.app.config['CACHE_DEFAULT_TIMEOUT'],
                 key_prefix=self.app.config['CACHE_KEY_PREFIX'])
         elif self.app.config['CACHE_TYPE'] == 'GAE':
             self.cache = GAEMemcachedCache(
                 default_timeout=self.app.config['CACHE_DEFAULT_TIMEOUT'],
                 key_prefix=self.app.config['CACHE_KEY_PREFIX'])
         elif self.app.config['CACHE_TYPE'] == 'FileSystem':
             self.cache = FileSystemCache(
                 self.app.config['CACHE_DIR'],
                 threshold=self.app.config['CACHE_THRESHOLD'],
                 default_timeout=self.app.config['CACHE_DEFAULT_TIMEOUT'])
开发者ID:dag,项目名称:flask-cache,代码行数:24,代码来源:cache.py


示例17: FileSystemCache

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sys import argv, stderr, stdout

from werkzeug.contrib.cache import FileSystemCache
from progressbar import ProgressBar
from requests import get
from cStringIO import StringIO
from xml.etree.cElementTree import dump, ElementTree
from iso8601 import parse_date
from geopy import Point, distance
from math import sqrt
from cluster import HierarchicalClustering

feed_cache = FileSystemCache('feed_cache', 3600)
name_cache = FileSystemCache('name_cache', 360000)

class Event():
    def __init__(self, name, uri, datetime, latitude, longitude):
        self.name = name
        self.uri = uri
        self.datetime = datetime
        self.latitude = latitude
        self.longitude = longitude

    def __repr__(self):
        return "%s <%s>, %s @ %s, %s" % (self.name, self.uri, self.datetime, self.latitude, self.longitude)

def spatialDistance(a, b):
    """
开发者ID:erlehmann,项目名称:partycluster,代码行数:31,代码来源:partycluster.py


示例18: FileSystemCache

import sys
from contextlib import closing

import lxml.html as html # pip install 'lxml>=2.3.1'
from lxml.html.clean        import Cleaner
from selenium.webdriver     import Firefox         # pip install selenium
from werkzeug.contrib.cache import FileSystemCache # pip install werkzeug

cache = FileSystemCache('.cachedir', threshold=100000)

url = sys.argv[1] if len(sys.argv) > 1 else "http://www.schibsted.cl/testqa/"


# get page
page_source = cache.get(url)
if page_source is None:
    # use firefox to get page with javascript generated content
    with closing(Firefox()) as browser:
        browser.get(url)
        page_source = browser.page_source
    cache.set(url, page_source, timeout=60*60*24*7) # week in seconds


# extract text
root = html.document_fromstring(page_source)
# remove flash, images, <script>,<style>, etc
Cleaner(kill_tags=['noscript'], style=True)(root) # lxml >= 2.3.1
webtext = root.text_content() # extract text
f = open("C:/schibsted/data/Test.txt", "w");
print f
value = (webtext)
开发者ID:oscarfunk,项目名称:schibsted,代码行数:31,代码来源:util.py


示例19: FileSystemSessionInterface

class FileSystemSessionInterface(SessionInterface):
    """Uses the :class:`werkzeug.contrib.cache.FileSystemCache` as a session
    backend.

    .. versionadded:: 0.2
        The `use_signer` parameter was added.

    :param cache_dir: the directory where session files are stored.
    :param threshold: the maximum number of items the session stores before it
                      starts deleting some.
    :param mode: the file mode wanted for the session files, default 0600
    :param key_prefix: A prefix that is added to FileSystemCache store keys.
    :param use_signer: Whether to sign the session id cookie or not.
    :param permanent: Whether to use permanent session or not.
    """

    session_class = FileSystemSession

    def __init__(self, cache_dir, threshold, mode, key_prefix,
                 use_signer=False, permanent=True):
        from werkzeug.contrib.cache import FileSystemCache
        self.cache = FileSystemCache(cache_dir, threshold=threshold, mode=mode)
        self.key_prefix = key_prefix
        self.use_signer = use_signer
        self.permanent = permanent

    def open_session(self, app, request):
        sid = request.cookies.get(app.session_cookie_name)
        if not sid:
            sid = self._generate_sid()
            return self.session_class(sid=sid, permanent=self.permanent)
        if self.use_signer:
            signer = self._get_signer(app)
            if signer is None:
                return None
            try:
                sid = signer.unsign(sid)
            except BadSignature:
                sid = self._generate_sid()
                return self.session_class(sid=sid, permanent=self.permanent)

        data = self.cache.get(self.key_prefix + sid)
        if data is not None:
            return self.session_class(data, sid=sid)
        return self.session_class(sid=sid, permanent=self.permanent)

    def save_session(self, app, session, response):
        domain = self.get_cookie_domain(app)
        path = self.get_cookie_path(app)
        if not session:
            if session.modified:
                self.cache.delete(self.key_prefix + session.sid)
                response.delete_cookie(app.session_cookie_name,
                                       domain=domain, path=path)
            return

        httponly = self.get_cookie_httponly(app)
        secure = self.get_cookie_secure(app)
        expires = self.get_expiration_time(app, session)
        data = dict(session)
        self.cache.set(self.key_prefix + session.sid, data,
                       total_seconds(app.permanent_session_lifetime))
        if self.use_signer:
            session_id = self._get_signer(app).sign(session.sid)
        else:
            session_id = session.sid
        response.set_cookie(app.session_cookie_name, session_id,
                            expires=expires, httponly=httponly,
                            domain=domain, path=path, secure=secure)
开发者ID:deeb230,项目名称:nzbhydra,代码行数:69,代码来源:sessions.py


示例20: __init__

    def __init__(self, cache_dir='cache', default_timeout=300):
        """初始化

        cache_dir是缓存目录
        """
        self.cache = FileSystemCache(cache_dir, default_timeout=default_timeout)
开发者ID:dingshanliang,项目名称:WechatSogou,代码行数:6,代码来源:filecache.py



注:本文中的werkzeug.contrib.cache.FileSystemCache类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python cache.SimpleCache类代码示例发布时间:2022-05-26
下一篇:
Python atom.AtomFeed类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap