本文整理汇总了Python中wsgiref.handlers.format_date_time函数的典型用法代码示例。如果您正苦于以下问题:Python format_date_time函数的具体用法?Python format_date_time怎么用?Python format_date_time使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了format_date_time函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: s3_upload
def s3_upload(bucket, filename, contents):
from _sha1 import sha1
from base64 import b64encode
import hmac
import mimetypes
import time
from wsgiref.handlers import format_date_time
mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
timestamp = time.time()
date = format_date_time(timestamp)
string_to_sign = '\n'.join(['PUT', '', mimetype, date, '/{}/{}'.format(bucket, filename)])
signature = hmac.new(config.AWS_ACCESS_KEY_SECRET.encode(), string_to_sign.encode(), sha1).digest()
signature = b64encode(signature).decode()
headers = {
'Authorization': 'AWS {}:{}'.format(config.AWS_ACCESS_KEY_ID, signature),
'Content-Length': len(contents),
'Content-Type': mimetype,
'Date': date,
'Expires': format_date_time(timestamp + 365 * 24 * 60 * 60),
}
conn = http.client.HTTPSConnection(bucket + '.s3.amazonaws.com')
conn.request('PUT', '/' + filename, contents, headers)
response = conn.getresponse()
if response.status != 200:
raise RuntimeError('s3 upload failed with {} {}: {}'.format(response.status, response.reason, response.read()))
开发者ID:Gridium,项目名称:jinja_static_s3,代码行数:26,代码来源:jinja.py
示例2: append_static_headers
def append_static_headers(response):
now = datetime.now()
stamp = mktime(now.timetuple())
response += WebServer.date_header + ' ' + format_date_time(stamp) + '\r\n'
response += WebServer.content_type_header + ' text/html\r\n'
response += WebServer.last_modified_header + ' ' + format_date_time(stamp) + '\r\n\r\n'
return response
开发者ID:davityle,项目名称:python-web-server,代码行数:7,代码来源:web.py
示例3: json_data_structure
def json_data_structure(self):
return {
'name': self.name,
'description': self.description,
'date_modified': format_date_time(time.mktime(self.date_modified.timetuple())),
'date_created': format_date_time(time.mktime(self.date_created.timetuple())),
}
开发者ID:vegitron,项目名称:authz_group,代码行数:7,代码来源:models.py
示例4: get_headers
def get_headers(self, length):
return [
("Cache-Control", "public, max-age=3600"),
("Content-Length", str(length)),
("Content-Type", "application/json"),
("Expires", format_date_time(time() + 3600)),
("Last-Modified", format_date_time(time())),
]
开发者ID:peterkinalex,项目名称:olympia,代码行数:8,代码来源:theme_update.py
示例5: __call__
def __call__(self):
if self._last_mod_date and self._etag:
items = feedparser.parse(self._url, etag=self._etag, modified=self._last_mod_date)
elif self._last_mod_date:
items = feedparser.parse(self._url, modified=self._last_mod_date)
elif self._etag:
items = feedparser.parse(self._url, etag=self._etag)
else:
items = feedparser.parse(self._url)
try:
etag = items.etag
except AttributeError:
etag = None
mod_date = None
try:
mod_date = format_date_time(mktime(items.modified_parse))
except AttributeError:
if not etag:
mod_date = format_date_time(mktime(datetime.utcnow().timetuple()))
for entry in items.entries:
item = {}
if mod_date:
item['modified'] = mod_date
if etag:
item['etag'] = items.etag
item['url'] = self._url
item['title'] = entry.get('title', '')
item['link'] = entry.get('link', '')
item['summary'] = entry.get('summary', '')
item['description'] = entry.get('description', '')
item['content'] = entry.get('content', [{}])
#http://pythonhosted.org/feedparser/common-atom-elements.html
#Atom entries can have more than one content element,
# content is a list of dictionaries
item['author'] = entry.get('author', '')
item['guid'] = entry.get('guid', '')
if 'published_parsed' in entry:
item['date'] = datetime.fromtimestamp(mktime(entry['published_parsed'])).isoformat()
else:
item['date'] = datetime.utcnow().isoformat()
item['made_up_date'] = True
self._q.put(item, block=True)
self._event.set()
开发者ID:promlow,项目名称:rss-to-mongodb,代码行数:48,代码来源:update-channels.py
示例6: get
def get(self, id):
data = get_graphs(prefix=id)
filename = 'periodo-graph-{}'.format(id.replace('/', '-'))
if len(data['graphs']) > 0:
return cache.medium_time(
api.make_response(data, 200, filename=filename))
args = versioned_parser.parse_args()
version = args.get('version')
graph = database.get_graph(id, version=version)
filename += ('' if version is None else '-v{}'.format(version))
if not graph:
abort(404)
graph_etag = 'graph-{}-version-{}'.format(id, graph['version'])
if request.if_none_match.contains_weak(graph_etag):
return None, 304
headers = {}
headers['Last-Modified'] = format_date_time(graph['created_at'])
data = json.loads(graph['data'])
response = api.make_response(data, 200, headers, filename=filename)
response.set_etag(graph_etag, weak=True)
if version is None:
return cache.medium_time(response)
else:
return cache.long_time(response)
开发者ID:periodo,项目名称:periodo-server,代码行数:31,代码来源:resources.py
示例7: lineReceived
def lineReceived(self, line):
# We pretend to support HTTP GET (even though logically, PUT or POST would be better)
# Still, it is all in the URL for simplicity
if line.startswith("GET"):
self.receivedGET = True
l = line.split(" ")
if len(l) != 3:
self.transport.loseConnection()
return
command = urllib.unquote(l[1][1:])
# Are we a somebody?
self.imperio.receiveLine(command)
now = datetime.now()
stamp = mktime(now.timetuple())
data = "{sent:true}"
headers = "HTTP/1.1 200 OK\r\n"
headers += "Date: " + format_date_time(stamp) + "\r\n"
headers += "Server: Firenze instance, probably on Dolores.\r\n"
headers += "Content-Length: " + str(len(data)) + "\r\n"
headers += "Content-Type: application/json\r\n\r\n"
self.transport.write(headers)
self.transport.write(data)
self.transport.loseConnection()
开发者ID:ialexi,项目名称:dobby,代码行数:26,代码来源:imperio.py
示例8: _add_default_headers
def _add_default_headers(self):
super()._add_default_headers()
if hdrs.DATE not in self.headers:
# format_date_time(None) is quite expensive
self.headers.setdefault(hdrs.DATE, format_date_time(None))
self.headers.setdefault(hdrs.SERVER, self.SERVER_SOFTWARE)
开发者ID:Magdno1,项目名称:Arianrhod,代码行数:7,代码来源:protocol.py
示例9: expiry_date
def expiry_date():
from wsgiref.handlers import format_date_time
from datetime import datetime, timedelta
from time import mktime
future_date = datetime.now() + timedelta(days=10 * 365)
stamp = mktime(future_date.timetuple())
return format_date_time(stamp)
开发者ID:kylefox,项目名称:kirby-OLD,代码行数:7,代码来源:s3.py
示例10: click_redirect
def click_redirect():
destination = request.args['url'].encode('utf-8')
fullname = request.args['id'].encode('utf-8')
observed_mac = request.args['hash']
expected_hashable = ''.join((destination, fullname))
expected_mac = hmac.new(
tracking_secret, expected_hashable, hashlib.sha1).hexdigest()
if not constant_time_compare(expected_mac, observed_mac):
abort(403)
# fix encoding in the query string of the destination
destination = urllib.unquote(destination)
u = urlparse(destination)
if u.query:
query_dict = dict(parse_qsl(u.query))
# this effectively calls urllib.quote_plus on every query value
query = urllib.urlencode(query_dict)
destination = urlunparse(
(u.scheme, u.netloc, u.path, u.params, query, u.fragment))
now = format_date_time(time.time())
response = redirect(destination)
response.headers['Cache-control'] = 'no-cache'
response.headers['Pragma'] = 'no-cache'
response.headers['Date'] = now
response.headers['Expires'] = now
return response
开发者ID:DMDZYP,项目名称:reddit,代码行数:30,代码来源:tracker.py
示例11: atoms
def atoms(message, environ, response, transport, request_time):
"""Gets atoms for log formatting."""
if message:
r = '{} {} HTTP/{}.{}'.format(
message.method, message.path,
message.version[0], message.version[1])
headers = message.headers
else:
r = ''
headers = {}
if transport is not None:
remote_addr = parse_remote_addr(
transport.get_extra_info('addr', '127.0.0.1'))
else:
remote_addr = ('',)
atoms = {
'h': remote_addr[0],
'l': '-',
'u': '-',
't': format_date_time(None),
'r': r,
's': str(getattr(response, 'status', '')),
'b': str(getattr(response, 'output_length', '')),
'f': headers.get(hdrs.REFERER, '-'),
'a': headers.get(hdrs.USER_AGENT, '-'),
'T': str(int(request_time)),
'D': str(request_time).split('.', 1)[-1][:5],
'p': "<%s>" % os.getpid()
}
return atoms
开发者ID:homm,项目名称:aiohttp,代码行数:33,代码来源:helpers.py
示例12: get
def get(self, request, path):
if self.path:
parent_url = request.build_absolute_uri(reverse('browse:index',
kwargs={'path': ''.join(p + '/' for p in path.split('/')[:-2])}))
else:
parent_url = None
subpaths, sort_name, sort_reverse = self.get_subpath_data(request, path)
stat = os.stat(self.path_on_disk)
context = {
'path': self.path,
'message': request.GET.get('message'),
'parent_url': parent_url,
'subpaths': subpaths,
'stat': statinfo_to_dict(stat),
'additional_headers': {
'Last-Modified': format_date_time(stat.st_mtime),
},
'sort_name': sort_name,
'sort_reverse': sort_reverse,
'column_names': (('name', 'Name'), ('modified', 'Last modified'), ('size', 'Size'), ('owner_name', 'Owner')),
'dataset_submissions': DatasetSubmission.objects.filter(path_on_disk=self.path_on_disk),
}
return self.render(request, context, 'browse/directory')
开发者ID:no-reply,项目名称:DataStage,代码行数:26,代码来源:views.py
示例13: get
def get(self):
args = dataset_parser.parse_args()
dataset = database.get_dataset(args.get('version', None))
if not dataset:
if args['version']:
return {'status': 404,
'message': 'Could not find given version.'}, 404
else:
return {'status': 501,
'message': 'No dataset loaded yet.'}, 501
requested_etag = args.get('etag')
dataset_etag = 'periodo-dataset-version-{}'.format(dataset['id'])
if requested_etag == dataset_etag:
return None, 304
headers = {}
headers['Last-Modified'] = format_date_time(dataset['created_at'])
if not args['version']:
headers['Cache-control'] = 'public, max-age=0'
else:
headers['Cache-control'] = 'public, max-age=604800'
response = api.make_response(
attach_to_dataset(json.loads(dataset['data'])), 200, headers)
response.set_etag(dataset_etag, weak=True)
return response
开发者ID:weberjavi,项目名称:periodo-server,代码行数:32,代码来源:resources.py
示例14: outbound
def outbound(response):
"""Set caching headers for resources under assets/.
"""
request = response.request
website = request.website
uri = request.line.uri
version = website.version
response.headers["X-Gittip-Version"] = version
if not uri.startswith("/assets/"):
return response
response.headers.cookie.clear()
if response.code == 304:
# https://github.com/gittip/www.gittip.com/issues/1308
del response.headers["Content-Type"]
return response
if website.cache_static:
# https://developers.google.com/speed/docs/best-practices/caching
response.headers["Cache-Control"] = "public"
response.headers["Vary"] = "accept-encoding"
if "version" in uri.path:
# This specific asset is versioned, so it's fine to cache it.
response.headers["Expires"] = "Sun, 17 Jan 2038 19:14:07 GMT"
else:
# Asset is not versioned. Don't cache it, but set Last-Modified.
last_modified = get_last_modified(request.fs)
response.headers["Last-Modified"] = format_date_time(last_modified)
开发者ID:justinxreese,项目名称:www.gittip.com,代码行数:35,代码来源:cache_static.py
示例15: _start_response
def _start_response(self, status, response_headers, exc_info=None):
server_headers = [
('Date', str(format_date_time(time()))),
('Server', 'WSGIServer 0.2'),
]
self.headers_set = [status, response_headers + server_headers]
开发者ID:garrickw,项目名称:webServer,代码行数:7,代码来源:seletorsServer.py
示例16: atoms
def atoms(message, environ, response, transport, request_time):
"""Gets atoms for log formatting."""
if message:
r = "{} {} HTTP/{}.{}".format(message.method, message.path, message.version[0], message.version[1])
headers = message.headers
else:
r = ""
headers = {}
if transport is not None:
remote_addr = parse_remote_addr(transport.get_extra_info("addr", "127.0.0.1"))
else:
remote_addr = ("",)
atoms = {
"h": remote_addr[0],
"l": "-",
"u": "-",
"t": format_date_time(None),
"r": r,
"s": str(getattr(response, "status", "")),
"b": str(getattr(response, "output_length", "")),
"f": headers.get(hdrs.REFERER, "-"),
"a": headers.get(hdrs.USER_AGENT, "-"),
"T": str(int(request_time)),
"D": str(request_time).split(".", 1)[-1][:5],
"p": "<%s>" % os.getpid(),
}
return atoms
开发者ID:danfischetti,项目名称:shape-encoder,代码行数:30,代码来源:helpers.py
示例17: init
def init(self, num_entries=25):
from wsgiref.handlers import format_date_time
from time import mktime
self.num_entries = num_entries
self.env.jinja2.filters['rfc822'] = lambda x: format_date_time(mktime(x.timetuple()))
开发者ID:sebix,项目名称:acrylamid,代码行数:7,代码来源:feeds.py
示例18: _output_if_modified
def _output_if_modified(self, content):
"""
Check for ETag, then fall back to If-Modified-Since
"""
with TraceContext() as root:
with root.span("CacheableHandler._output_if_modified"):
modified = True
# Normalize content
try:
content = str(content)
except UnicodeEncodeError:
content = unicode(content).encode('utf-8')
etag = 'W/"{}"'.format(hashlib.md5(content).hexdigest()) # Weak ETag
self.response.headers['ETag'] = etag
if_none_match = self.request.headers.get('If-None-Match')
if if_none_match and etag in [x.strip() for x in if_none_match.split(',')]:
self.response.set_status(304)
modified = False
# Fall back to If-Modified-Since
if modified and self._last_modified is not None:
last_modified = format_date_time(mktime(self._last_modified.timetuple()))
if_modified_since = self.request.headers.get('If-Modified-Since')
self.response.headers['Last-Modified'] = last_modified
if if_modified_since and if_modified_since == last_modified:
self.response.set_status(304)
modified = False
if modified:
self.response.out.write(content)
return modified
开发者ID:ZachOrr,项目名称:the-blue-alliance,代码行数:35,代码来源:base_controller.py
示例19: outbound
def outbound(request, response, website):
"""Set caching headers for resources under assets/.
"""
uri = request.line.uri
if not uri.startswith('/assets/'):
return response
response.headers.cookie.clear()
response.headers.pop('Vary')
if response.code != 200:
return response
if website.cache_static:
# https://developers.google.com/speed/docs/best-practices/caching
response.headers['Cache-Control'] = 'public'
response.headers['Vary'] = 'accept-encoding'
# all assets are versioned, so it's fine to cache them
response.headers['Expires'] = 'Sun, 17 Jan 2038 19:14:07 GMT'
last_modified = get_last_modified(request.fs)
response.headers['Last-Modified'] = format_date_time(last_modified)
开发者ID:Resike,项目名称:www.gittip.com,代码行数:25,代码来源:cache_static.py
示例20: add_header
def add_header(response):
response.cache_control.max_age = 0
now = datetime.datetime.now()
expires_time = now + datetime.timedelta(seconds=1)
response.headers['Cache-Control'] = 'public'
response.headers['Expires'] = format_date_time(time.mktime(expires_time.timetuple()))
return response
开发者ID:InternetProgrammingCS330,项目名称:Sparrow,代码行数:7,代码来源:controllers.py
注:本文中的wsgiref.handlers.format_date_time函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论