本文整理汇总了Python中r2.lib.utils.epoch_timestamp函数的典型用法代码示例。如果您正苦于以下问题:Python epoch_timestamp函数的具体用法?Python epoch_timestamp怎么用?Python epoch_timestamp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了epoch_timestamp函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: cast_vote
def cast_vote(user, thing, direction, **data):
"""Register a vote and queue it for processing."""
update_vote_lookups(user, thing, direction)
vote_data = {
"user_id": user._id,
"thing_fullname": thing._fullname,
"direction": direction,
"date": int(epoch_timestamp(datetime.now(g.tz))),
}
data['ip'] = getattr(request, "ip", None)
if data['ip'] is not None:
data['org'] = organization_by_ips(data['ip'])
vote_data['data'] = data
hooks.get_hook("vote.get_vote_data").call(
data=vote_data["data"],
user=user,
thing=thing,
request=request,
context=c,
)
# The vote event will actually be sent from an async queue processor, so
# we need to pull out the context data at this point
if not g.running_as_script:
vote_data["event_data"] = {
"context": Event.get_context_data(request, c),
"sensitive": Event.get_sensitive_context_data(request, c),
}
amqp.add_item(thing.vote_queue_name, json.dumps(vote_data))
开发者ID:Arinzeokeke,项目名称:reddit,代码行数:33,代码来源:voting.py
示例2: vote_event
def vote_event(self, vote, old_vote=None, event_base=None, request=None,
context=None):
"""Create a 'vote' event for event-collector
vote: An Storage object representing the new vote, as handled by
vote.py / queries.py
old_vote: A Storage object representing the previous vote on this
thing, if there is one. NOTE: This object has a different
set of attributes compared to the new "vote" object.
event_base: The base fields for an Event. If not given, caller MUST
supply a pylons.request and pylons.c object to build a base from
request, context: Should be pylons.request & pylons.c respectively;
used to build the base Event if event_base is not given
"""
if event_base is None:
event_base = Event.base_from_request(request, context)
event_base["event_topic"] = "vote"
event_base["event_name"] = "vote_server"
event_base["event_ts"] = _epoch_to_millis(epoch_timestamp(vote._date))
event_base["vote_target"] = vote._thing2._fullname
event_base["vote_direction"] = self.VOTES[vote._name]
if old_vote:
event_base["prev_vote_direction"] = self.VOTES[old_vote.direction]
event_base["prev_vote_ts"] = _epoch_to_millis(old_vote.date)
event_base["vote_type"] = vote._thing2.__class__.__name__.lower()
if event_base["vote_type"] == "link" and vote._thing2.is_self:
event_base["vote_type"] = "self"
event_base["sr"] = vote._thing2.subreddit_slow.name
event_base["sr_id"] = str(vote._thing2.subreddit_slow._id)
self.save_event(event_base)
开发者ID:ActivateServices,项目名称:reddit,代码行数:33,代码来源:eventcollector.py
示例3: create
def create(cls, user, thing, vote):
# we don't use the user or thing args, but they need to be there for
# calling this automatically when updating views of a DenormalizedRel
vote_data = vote.data.copy()
# pull the IP out of the data to store it separately with a TTL
ip = vote_data.pop("ip")
effects_data = vote.effects.serializable_data
# split the notes out to store separately
notes = effects_data.pop("notes", None)
data = json.dumps(
{
"direction": Vote.serialize_direction(vote.direction),
"date": int(epoch_timestamp(vote.date)),
"data": vote_data,
"effects": effects_data,
}
)
cls._set_values(vote.thing._id36, {vote.user._id36: data})
# write the IP data and notes separately so they can be TTLed
if ip:
VoterIPByThing.create(vote, ip)
if notes:
VoteNote.set(vote, notes)
开发者ID:scott71,项目名称:reddit,代码行数:29,代码来源:vote.py
示例4: __init__
def __init__(self, topic, event_type,
time=None, uuid=None, request=None, context=None, testing=False,
data=None, obfuscated_data=None):
"""Create a new event for event-collector.
topic: Used to filter events into appropriate streams for processing
event_type: Used for grouping and sub-categorizing events
time: Should be a datetime.datetime object in UTC timezone
uuid: Should be a UUID object
request, context: Should be pylons.request & pylons.c respectively
testing: Whether to send the event to the test endpoint
data: A dict of field names/values to initialize the payload with
obfuscated_data: Same as `data`, but fields that need obfuscation
"""
self.topic = topic
self.event_type = event_type
self.testing = testing
if not time:
time = datetime.datetime.now(pytz.UTC)
self.timestamp = _epoch_to_millis(epoch_timestamp(time))
if not uuid:
uuid = uuid4()
self.uuid = str(uuid)
self.payload = data or {}
self.obfuscated_data = obfuscated_data or {}
if context and request:
self.payload.update(self.get_context_data(request, context))
self.obfuscated_data.update(
self.get_sensitive_context_data(request, context))
开发者ID:niklasnson,项目名称:reddit,代码行数:33,代码来源:eventcollector.py
示例5: convert_old_vote_data
def convert_old_vote_data(data, timestamp):
converted = {
"user_id": data["uid"],
"thing_fullname": data["tid"],
"date": int(epoch_timestamp(timestamp.replace(tzinfo=pytz.UTC))),
}
if data["dir"] == True:
converted["direction"] = Vote.DIRECTIONS.up
elif data["dir"] == False:
converted["direction"] = Vote.DIRECTIONS.down
else:
converted["direction"] = Vote.DIRECTIONS.unvote
extra_data = {
"ip": data["ip"],
"valid_event": True, # vote wouldn't have been queued for invalid ones
}
if data["cheater"]:
extra_data["invalid_source"] = data["cheater"]
if data["info"]:
extra_data["referrer"] = data["info"]
converted["data"] = extra_data
if "event_data" in data:
converted["event_data"] = data["event_data"]
return converted
开发者ID:zeantsoi,项目名称:reddit,代码行数:31,代码来源:voting.py
示例6: log_deletion
def log_deletion(cls, sr, page_name, image_name, old_url):
if not old_url:
# it didn't exist before
return
rowkey = WikiPage.id_for(sr, page_name)
ts = datetime.now(pytz.UTC)
colkey = uuid1()
colval = {"image_name": image_name, "url": old_url, "timestamp": epoch_timestamp(ts)}
cls._set_values(rowkey, {colkey: colval})
开发者ID:zeantsoi,项目名称:reddit,代码行数:10,代码来源:wiki.py
示例7: vote_event
def vote_event(self, vote):
"""Create a 'vote' event for event-collector
vote: An r2.models.vote Vote object
"""
# For mapping vote directions to readable names used by data team
def get_vote_direction_name(vote):
if vote.is_upvote:
return "up"
elif vote.is_downvote:
return "down"
else:
return "clear"
event = EventV2(
topic="vote_server",
event_type="server_vote",
time=vote.date,
data=vote.event_data["context"],
obfuscated_data=vote.event_data["sensitive"],
)
event.add("vote_direction", get_vote_direction_name(vote))
subreddit = vote.thing.subreddit_slow
event.add("sr_id", subreddit._id)
event.add("sr_name", subreddit.name)
target = vote.thing
target_type = target.__class__.__name__.lower()
if target_type == "link" and target.is_self:
target_type = "self"
event.add("target_fullname", target._fullname)
event.add("target_type", target_type)
if vote.previous_vote:
event.add("prev_vote_direction",
get_vote_direction_name(vote.previous_vote))
event.add("prev_vote_ts",
_epoch_to_millis(epoch_timestamp(vote.previous_vote.date)))
if vote.is_automatic_initial_vote:
event.add("auto_self_vote", True)
for name, value in vote.effects.serializable_data.iteritems():
# rename the "notes" field to "process_notes" for the event
if name == "notes":
name = "process_notes"
event.add(name, value)
self.save_event(event)
开发者ID:scott71,项目名称:reddit,代码行数:53,代码来源:eventcollector.py
示例8: __init__
def __init__(self, topic, event_type,
time=None, uuid=None, request=None, context=None, testing=False,
data=None, obfuscated_data=None):
"""Create a new event for event-collector.
topic: Used to filter events into appropriate streams for processing
event_type: Used for grouping and sub-categorizing events
time: Should be a datetime.datetime object in UTC timezone
uuid: Should be a UUID object
request, context: Should be pylons.request & pylons.c respectively
testing: Whether to send the event to the test endpoint
data: A dict of field names/values to initialize the payload with
obfuscated_data: Same as `data`, but fields that need obfuscation
"""
self.topic = topic
self.event_type = event_type
self.testing = testing
if not time:
time = datetime.datetime.now(pytz.UTC)
self.timestamp = _epoch_to_millis(epoch_timestamp(time))
if not uuid:
uuid = uuid4()
self.uuid = str(uuid)
self.payload = {}
if data:
self.payload.update(data)
self.obfuscated_data = {}
if obfuscated_data:
self.obfuscated_data.update(obfuscated_data)
if context and request:
# Since we don't want to override any of these values that callers
# might have set, we have to do a bit of finagling to filter out
# the values that've already been set. Variety of other solutions
# here: http://stackoverflow.com/q/6354436/120999
context_data = self.get_context_data(request, context)
new_context_data = {k: v for (k, v) in context_data.items()
if k not in self.payload}
self.payload.update(new_context_data)
context_data = self.get_sensitive_context_data(request, context)
new_context_data = {k: v for (k, v) in context_data.items()
if k not in self.obfuscated_data}
self.obfuscated_data.update(new_context_data)
开发者ID:P3lUZa,项目名称:reddit,代码行数:47,代码来源:eventcollector.py
示例9: cast_vote
def cast_vote(user, thing, direction, **data):
"""Register a vote and queue it for processing."""
if not isinstance(thing, (Link, Comment)):
return
update_vote_lookups(user, thing, direction)
vote_data = {
"user_id": user._id,
"thing_fullname": thing._fullname,
"direction": direction,
"date": int(epoch_timestamp(datetime.now(g.tz))),
}
data['ip'] = getattr(request, "ip", None)
if data['ip'] is not None:
data['org'] = organization_by_ips(data['ip'])
vote_data['data'] = data
hooks.get_hook("vote.get_vote_data").call(
data=vote_data["data"],
user=user,
thing=thing,
request=request,
context=c,
)
# The vote event will actually be sent from an async queue processor, so
# we need to pull out the context data at this point
if not g.running_as_script:
vote_data["event_data"] = {
"context": Event.get_context_data(request, c),
"sensitive": Event.get_sensitive_context_data(request, c),
}
try:
vote_dump = json.dumps(vote_data)
except UnicodeDecodeError:
g.log.error("Got weird unicode in the vote data: %r", vote_data)
return
if isinstance(thing, Link):
queue = "vote_link_q"
elif isinstance(thing, Comment):
queue = "vote_comment_q"
amqp.add_item(queue, vote_dump)
开发者ID:13steinj,项目名称:reddit,代码行数:47,代码来源:voting.py
示例10: submit_event
def submit_event(self, new_link, event_base=None, request=None,
context=None):
"""Create a 'submit' event for event-collector
new_link: An r2.models.Link object
event_base: The base fields for an Event. If not given, caller MUST
supply a pylons.request and pylons.c object to build a base from
request, context: Should be pylons.request & pylons.c respectively;
used to build the base Event if event_base is not given
"""
if event_base is None:
event_base = Event.base_from_request(request, context)
event_base["event_topic"] = "submit"
event_base["event_name"] = "submit_server"
submit_ts = epoch_timestamp(new_link._date)
event_base["event_ts"] = _epoch_to_millis(submit_ts)
event_base["id"] = new_link._fullname
event_base["type"] = "self" if new_link.is_self else "link"
sr = new_link.subreddit_slow
event_base["sr"] = sr.name
event_base["sr_id"] = str(sr._id)
event_base["title"] = new_link.title
if new_link._spam:
event_base["flagged_spam"] = True
banner = getattr(new_link, "ban_info", {}).get("banner")
if banner:
event_base["spam_reason"] = banner
content = new_link.selftext if new_link.is_self else new_link.url
content_length = len(content)
event_base["length"] = content_length
event_base["text"] = content
size_so_far = len(json.dumps(event_base))
oversize = size_so_far - MAX_EVENT_SIZE
if oversize > 0:
event_base["text"] = event_base["text"][:-oversize]
self.save_event(event_base)
开发者ID:ActivateServices,项目名称:reddit,代码行数:46,代码来源:eventcollector.py
示例11: __init__
def __init__(self, topic, event_type,
time=None, uuid=None, request=None, context=None, testing=False):
"""Create a new event for event-collector.
topic: Used to filter events into appropriate streams for processing
event_type: Used for grouping and sub-categorizing events
time: Should be a datetime.datetime object in UTC timezone
uuid: Should be a UUID object
request, context: Should be pylons.request & pylons.c respectively
testing: Whether to send the event to the test endpoint
"""
self.topic = topic
self.event_type = event_type
self.testing = testing
if not time:
time = datetime.datetime.now(pytz.UTC)
self.timestamp = _epoch_to_millis(epoch_timestamp(time))
if not uuid:
uuid = uuid4()
self.uuid = str(uuid)
self.payload = {}
self.obfuscated_data = {}
if context and request:
if context.user_is_loggedin:
self.add("user_id", context.user._id)
self.add("user_name", context.user.name)
else:
loid = request.cookies.get("loid", None)
if loid:
self.add("loid", loid)
oauth2_client = getattr(context, "oauth2_client", None)
if oauth2_client:
self.add("oauth2_client_id", oauth2_client._id)
self.add("domain", request.host)
self.add("user_agent", request.user_agent)
if getattr(request, "ip", None):
self.add("client_ip", request.ip, obfuscate=True)
开发者ID:joannekoong,项目名称:reddit,代码行数:43,代码来源:eventcollector.py
示例12: add_target_fields
def add_target_fields(self, target):
if not target:
return
from r2.models import Comment, Link, Message
self.add("target_id", target._id)
self.add("target_fullname", target._fullname)
self.add("target_type", target.__class__.__name__.lower())
# If the target is an Account or Subreddit (or has a "name" attr),
# add the target_name
if hasattr(target, "name"):
self.add("target_name", target.name)
# Add info about the target's author for comments, links, & messages
if isinstance(target, (Comment, Link, Message)):
author = target.author_slow
if target._deleted or author._deleted:
self.add("target_author_id", 0)
self.add("target_author_name", "[deleted]")
else:
self.add("target_author_id", author._id)
self.add("target_author_name", author.name)
# Add info about the url being linked to for link posts
if isinstance(target, Link) and not target.is_self:
self.add("target_url", target.url)
self.add("target_url_domain", target.link_domain())
# Add info about the link being commented on for comments
if isinstance(target, Comment):
link_fullname = Link._fullname_from_id36(to36(target.link_id))
self.add("link_id", target.link_id)
self.add("link_fullname", link_fullname)
# Add info about when target was originally posted for links/comments
if isinstance(target, (Comment, Link)):
self.add("target_created_ts",
_epoch_to_millis(epoch_timestamp(target._date)))
开发者ID:P3lUZa,项目名称:reddit,代码行数:39,代码来源:eventcollector.py
示例13: _datetime_to_millis
def _datetime_to_millis(dt):
"""Convert a standard datetime to epoch milliseconds."""
return _epoch_to_millis(epoch_timestamp(dt))
开发者ID:HeliumSquid,项目名称:reddit,代码行数:3,代码来源:eventcollector.py
示例14: cache_poisoning_event
def cache_poisoning_event(self, poison_info, event_base=None, request=None,
context=None):
"""Create a 'cache_poisoning_server' event for event-collector
poison_info: Details from the client about the poisoning event
event_base: The base fields for an Event. If not given, caller MUST
supply a pylons.request and pylons.c object to build a base from
request, context: Should be pylons.request & pylons.c respectively;
used to build the base Event if event_base is not given
"""
if event_base is None:
event_base = Event.base_from_request(request, context)
event_base["event_name"] = "cache_poisoning_server"
event_base["event_topic"] = "cache_poisoning"
submit_ts = epoch_timestamp(datetime.datetime.now(pytz.UTC))
event_base["event_ts"] = _epoch_to_millis(submit_ts)
poisoner_name = poison_info.pop("poisoner_name")
event_base.update(**poison_info)
event_base["poison_blame_guess"] = "proxy"
resp_headers = poison_info["resp_headers"]
if resp_headers:
# Check if the caching headers we got back match the current policy
cache_policy = poison_info["cache_policy"]
headers_valid = cache_headers_valid(cache_policy, resp_headers)
event_base["cache_headers_valid"] = headers_valid
# try to determine what kind of poisoning we're dealing with
if poison_info["source"] == "web":
# Do we think they logged in the usual way, or do we think they
# got poisoned with someone else's session cookie?
valid_login_hook = hooks.get_hook("poisoning.guess_valid_login")
if valid_login_hook.call_until_return(poisoner_name=poisoner_name):
# Maybe a misconfigured local Squid proxy + multiple
# clients?
event_base["poison_blame_guess"] = "local_proxy"
event_base["poison_credentialed_guess"] = False
elif (context.user_is_loggedin and
context.user.name == poisoner_name):
# Guess we got poisoned with a cookie-bearing response.
event_base["poison_credentialed_guess"] = True
else:
event_base["poison_credentialed_guess"] = False
elif poison_info["source"] == "mweb":
# All mweb responses contain an OAuth token, so we have to assume
# whoever got this response can perform actions as the poisoner
event_base["poison_credentialed_guess"] = True
else:
raise Exception("Unsupported source in cache_poisoning_event")
# Check if the CF-Cache-Status header is present (this header is not
# present if caching is disallowed.) If it is, the CDN caching rules
# are all jacked up.
if resp_headers and "cf-cache-status" in resp_headers:
event_base["poison_blame_guess"] = "cdn"
size_so_far = len(json.dumps(event_base))
oversize = size_so_far - MAX_EVENT_SIZE
if oversize > 0:
# It's almost definitely the headers that are too large
event_base["resp_headers"] = {}
# No JSON support in the DBs we target
event_base["resp_headers"] = json.dumps(event_base["resp_headers"])
self.save_event(event_base)
开发者ID:rgandsam,项目名称:reddit,代码行数:71,代码来源:eventcollector.py
示例15: current_epoch
def current_epoch():
return int(epoch_timestamp(datetime.now(pytz.UTC)))
开发者ID:Arinzeokeke,项目名称:reddit,代码行数:2,代码来源:signing.py
注:本文中的r2.lib.utils.epoch_timestamp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论