本文整理汇总了Python中re2.search函数的典型用法代码示例。如果您正苦于以下问题:Python search函数的具体用法?Python search怎么用?Python search使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了search函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: response
def response(context, flow):
"""========================================================================
"Called when a server response has been received"... łapię wyłącznie
odpowiedzi, bo interesują mnie zestawy (request/response). Przechwycony
response wraz z requestem wchodzą w skład transakcji, reprezentowanej przez
mitmproxy.models.HTTPFlow()
"HTTPFlow is collection of objects representing a single HTTP transaction".
Więcej info na WWW: http://docs.mitmproxy.org/en/stable/dev/models.html
==========================================================================="""
if flow.request.host.endswith('.thesettlersonline.pl'):
if "application/x-amf" in flow.response.headers.get("Content-Type", "_"):
with decoded(flow.response):
res = flow.response.content
req = flow.request.content
if search( 'defaultGame.Communication.VO.TradeWindow.dTradeWindowResultVO', res )\
and search( 'userAcceptedTradeIDs', res ) and search( 'tradeOffers', res )\
and search( 'GetAvailableOffers', req ):
log.debug("got trade REQ/RESP pair, feeding TDD thread...")
try:
t= Thread(target=ttd._incoming_traffic_handler, args=(context, flow,))
t.setDaemon(True)
t.start()
except (KeyboardInterrupt, SystemExit):
log.info('caught either KeyboardInterrupt or SystemExit, quitting threads')
t.__stop()
import thread
thread.interrupt_main()
开发者ID:internety,项目名称:smarTSO,代码行数:27,代码来源:TradeDumper.py
示例2: find_competitor_list
def find_competitor_list(search_text):
processed_text = grammar_matcher.StringProcessor(search_text)
results_match = re.search(r'\n0*1[^\d].+\n^0*2[^\d].+\n(?:^\d+.+\n){2,}', processed_text.text, re.MULTILINE)
if results_match:
numbered_list = results_match.group(0)
num_lines = numbered_list.count('\n')
if len(re.findall(r'\d ?[.:h] ?\d\d|\bam\b|\bpm\b', numbered_list)) > num_lines / 4:
return None # good list of times! workshops, etc! performance/shows/club-set times!
processed_numbered_list = grammar_matcher.StringProcessor(numbered_list, processed_text.match_on_word_boundaries)
event_keywords = processed_numbered_list.get_tokens(rules.EVENT)
if len(event_keywords) > num_lines / 8:
return None
if processed_text.has_token(keywords.WRONG_NUMBERED_LIST):
return None
if num_lines > 10:
return numbered_list
else:
lines = numbered_list.split('\n')
qualified_lines = len([x for x in lines if re.search(r'[^\d\W].*[-(]', x)])
if qualified_lines > num_lines / 2:
return numbered_list
for type in ['crew', 'pop|boog', 'lock', 'b\W?(?:boy|girl)']:
qualified_lines = len([x for x in lines if re.search(type, x)])
if qualified_lines > num_lines / 8:
return numbered_list
if processed_text.match_on_word_boundaries == regex_keywords.WORD_BOUNDARIES: # maybe separate on kana vs kanji?
avg_words = 1.0 * sum([len([y for y in x.split(' ')]) for x in lines]) / num_lines
if avg_words < 3:
return numbered_list
return None
开发者ID:mikelambert,项目名称:dancedeets-monorepo,代码行数:30,代码来源:event_structure.py
示例3: on_call
def on_call(self, call, process):
if call["api"] == "CreateProcessInternalW":
clbuf = self.get_argument(call, "CommandLine").lower()
cfbuf = int(self.get_argument(call, "CreationFlags"), 16)
# Handle Powershell CommandLine Arguments
if "powershell" in clbuf and (re.search("-win[ ]+hidden", clbuf) or
re.search("-windowstyle[ ]+hidden", clbuf)):
proc = process["process_name"]
spawn = self.get_argument(call, "ApplicationName")
self.hidden.append((proc, spawn))
self.data.append({"Process": proc + " -> " + spawn})
# Handle CREATE_NO_WINDOW flag, ignored for CREATE_NEW_CONSOLE and DETACHED_PROCESS
elif cfbuf & 0x08000000 and not (cfbuf & 0x10 or cfbuf & 0x8):
proc = process["process_name"]
spawn = self.get_argument(call, "ApplicationName")
self.hidden.append((proc, spawn))
self.data.append({"Process": proc + " -> " + spawn})
elif call["api"] == "ShellExecuteExW":
buf = int(self.get_argument(call, "Show"), 10)
# Handle SW_HIDE flag
if buf == 0:
proc = process["process_name"]
spawn = self.get_argument(call, "FilePath")
self.hidden.append((proc, spawn))
self.data.append({"Process": proc + " -> " + spawn})
开发者ID:kevross33,项目名称:community-modified,代码行数:26,代码来源:stealth_window.py
示例4: _route
def _route(user, jid, msg):
session = env.user.session()
try:
return session(msg['body'])
except SessionCallError:
pass
message = env.user.resolve_aliases(msg['body'])
args = {}
for r in self.route:
m = re.search(r['resource'], msg['resource'])
if m:
args = m.groupdict()
route = r['route']
break
for regex, view in route:
match = re.search(regex, message)
if match:
for g, val in match.groupdict().iteritems():
args[g] = val
log.debug(">>> %s routed to %s(%s) via '%s'" % \
(jid, view.__name__, str(args), regex.pattern))
return view(**args)
开发者ID:radjah,项目名称:point-xmpp,代码行数:25,代码来源:worker.py
示例5: _vrfy_response
def _vrfy_response(self, r):
""" szuka stringów występujących tylko w poprawnych odpowiedziach,
jeśli występują przekazuje odpowiedź, jeśli nie -> False
----------------------------------------------------------------"""
return r if search('userAcceptedTradeIDs',r) and search('tradeOffers',r)\
and search('defaultGame.Communication.VO.TradeWindow.dTradeWindowResultVO',r)\
else False
开发者ID:internety,项目名称:smarTSO,代码行数:7,代码来源:TradeDumper.py
示例6: extract_config
def extract_config(file_path, decomp_jar):
enckey = coded_jar = False
if not decomp_jar:
return None
ret = { }
try:
with ZipFile(file_path, 'r') as zip:
for name in zip.namelist():
if name == 'e-data':
coded_data = zip.read(name)
seed = coded_data[:8]
enckey = unpack('>Q', seed)[0]
if enckey and coded_data:
java_rand = JavaRandom(enckey)
coded_data = coded_data[8:]
decoded_data = ""
for i in range(len(coded_data)):
key = java_rand.nextInt(255)
dec_byte = chr((ord(coded_data[i]) - key + 256) % 256)
decoded_data += dec_byte
decoded_path = store_temp_file(decoded_data, "qrat.jar")
try:
p = Popen(["java", "-jar", decomp_jar, decoded_path], stdout=PIPE)
decompiled_data = p.stdout.read()
except:
pass
match = re.search("Utils\.serverHost = new String\[\] \{(?P<stringlist>[^};\r\n]*)\};", decompiled_data)
if match:
hostlist = match.group('stringlist').split(',')
serverhosts = [x.strip(" \"") for x in hostlist]
for i in xrange(len(serverhosts)):
ret["ServerHost" + str(i)] = serverhosts[i]
match = re.search("Utils\.serverPort = (?P<portnum>\d+);", decompiled_data)
if match:
ret["ServerPort"] = int(match.group('portnum'))
match = re.search("Utils\.instanceControlPortAgent = (?P<portnum>\d+);", decompiled_data)
if match:
ret["InstanceControlPortAgent"] = int(match.group('portnum'))
match = re.search("Utils\.instanceControlPortClient = (?P<portnum>\d+);", decompiled_data)
if match:
ret["InstanceControlPortClient"] = int(match.group('portnum'))
try:
os.unlink(decoded_path)
except:
pass
return ret
except:
pass
return None
开发者ID:453483289,项目名称:cuckoo-modified,代码行数:58,代码来源:qrat.py
示例7: _ignore_link_pattern
def _ignore_link_pattern(url: typing.Optional[str]) -> bool:
"""Return true if the url or redirect_url matches the ignore link pattern."""
if url is None:
return False
p = mediawords.tm.extract_story_links.IGNORE_LINK_PATTERN
nu = mediawords.util.url.normalize_url_lossy(url)
return re2.search(p, url, re2.I) or re2.search(p, nu, re2.I)
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:9,代码来源:fetch_link.py
示例8: parsesections
def parsesections(pattern, pattern_replace, section):
sectionsref = re.search(pattern, section)
while sectionsref:
i1 = sectionsref.start(1)
i2 = sectionsref.end(2)
#print "found multiple secs at", i1, "-", i2
section = section[:i1]+re.sub(pattern_replace[0], pattern_replace[1] % sectionsref.group(2), section[i1:i2]) + section[1+i2:]
sectionsref = re.search(pattern, section)
return section
开发者ID:aih,项目名称:uscites,代码行数:9,代码来源:autoparser.py
示例9: _filter
def _filter(source):
"""Extracts and decode payload (original file) from `source`"""
try:
varname = re.search(r'eval\(\w+\(\w+\((\w+)\)\)\);', source).group(1)
reverse = re.search(r"var +%s *\= *'(.*)';" % varname, source).group(1)
except AttributeError:
raise UnpackingError('Malformed MyObfuscate data.')
try:
return base64.b64decode(reverse[::-1].encode('utf8')).decode('utf8')
except TypeError:
raise UnpackingError('MyObfuscate payload is not base64-encoded.')
开发者ID:453483289,项目名称:cuckoo-modified,代码行数:11,代码来源:myobfuscate.py
示例10: test_search_star_plus
def test_search_star_plus(self):
self.assertEqual(re.search('x*', 'axx').span(0), (0, 0))
self.assertEqual(re.search('x*', 'axx').span(), (0, 0))
self.assertEqual(re.search('x+', 'axx').span(0), (1, 3))
self.assertEqual(re.search('x+', 'axx').span(), (1, 3))
self.assertEqual(re.search('x', 'aaa'), None)
self.assertEqual(re.match('a*', 'xxx').span(0), (0, 0))
self.assertEqual(re.match('a*', 'xxx').span(), (0, 0))
self.assertEqual(re.match('x*', 'xxxa').span(0), (0, 3))
self.assertEqual(re.match('x*', 'xxxa').span(), (0, 3))
self.assertEqual(re.match('a+', 'xxx'), None)
开发者ID:PeterScott,项目名称:pyre2,代码行数:11,代码来源:test_re.py
示例11: on_call
def on_call(self, call, process):
if call["api"] == "CreateProcessInternalW":
clbuf = call["arguments"]["command_line"].lower()
# Handle Powershell CommandLine Arguments
if "powershell" in clbuf and (re.search("-win[ ]+hidden", clbuf) or re.search("-windowstyle[ ]+hidden", clbuf)):
self.mark_call()
# CREATE_NO_WINDOW flag
elif call["flags"]["creation_flags"] == "CREATE_NO_WINDOW":
self.mark_call()
elif call["api"] == "ShellExecuteExW":
if call["arguments"]["show_type"] == 0:
self.mark_call()
开发者ID:RicoVZ,项目名称:community,代码行数:13,代码来源:stealth_window.py
示例12: run
def run(self,results):
"""Run Moloch to import pcap
@return: nothing
"""
self.key = "moloch"
self.alerthash ={}
self.MOLOCH_CAPTURE_BIN = self.options.get("capture", None)
self.MOLOCH_CAPTURE_CONF = self.options.get("captureconf",None)
self.CUCKOO_INSTANCE_TAG = self.options.get("node",None)
self.MOLOCH_USER = self.options.get("user",None)
self.MOLOCH_PASSWORD = self.options.get("pass",None)
self.MOLOCH_REALM = self.options.get("realm",None)
self.pcap_path = os.path.join(self.analysis_path, "dump.pcap")
self.MOLOCH_URL = self.options.get("base",None)
m = re.search(r"/(?P<task_id>\d+)/dump.pcap$",self.pcap_path)
if m == None:
log.warning("Unable to find task id from %s" % (self.pcap_path))
return results
else:
self.task_id = m.group("task_id")
if not os.path.exists(self.MOLOCH_CAPTURE_BIN):
log.warning("Unable to Run moloch-capture: BIN File %s Does Not Exist" % (self.MOLOCH_CAPTURE_BIN))
return
if not os.path.exists(self.MOLOCH_CAPTURE_CONF):
log.warning("Unable to Run moloch-capture Conf File %s Does Not Exist" % (self.MOLOCH_CAPTURE_CONF))
return
try:
cmd = "%s -c %s -r %s -n %s -t %s:%s" % (self.MOLOCH_CAPTURE_BIN,self.MOLOCH_CAPTURE_CONF,self.pcap_path,self.CUCKOO_INSTANCE_TAG,self.CUCKOO_INSTANCE_TAG,self.task_id)
except Exception,e:
log.warning("Unable to Build Basic Moloch CMD: %s" % e)
开发者ID:Cyber-Forensic,项目名称:cuckoo-modified,代码行数:33,代码来源:moloch.py
示例13: add_post
def add_post():
text = env.request.args('text', '').strip()
tags = env.request.args('tags', '').strip(' \t*,;')
if isinstance(tags, str):
tags = tags.decode('utf-8')
tags = [t.replace(u"\xa0", " ") for t in re.split(r'\s*[,;*]\s*', tags)]
private = bool(env.request.args('private'))
m = re.search(r'^\s*(?P<to>(?:@[a-z0-9_-]+[,\s]*)+)', text)
to = parse_logins(m.group('to')) if m else []
files = _files([])
sess = Session()
sess['clear_post_input'] = True
sess.save()
try:
id = posts.add_post(text, tags=tags, to=to, private=private, files=files)
except PostTextError:
return render('/post-error.html')
return Response(redirect='%s://%s.%s/%s' % \
(env.request.protocol,
env.user.login, settings.domain, id))
开发者ID:ap-Codkelden,项目名称:point-www,代码行数:27,代码来源:blog.py
示例14: on_complete
def on_complete(self):
if self.sigchanged:
return True
ret = False
if self.found:
ret = True
if self.c2s:
for c2 in self.c2s:
self.data.append({"C2": c2})
if self.carve_mem:
if "procmemory" in self.results and self.results["procmemory"]:
dump_path = str()
for process in self.results["procmemory"]:
if process["pid"] == int(self.found):
dump_path = process["file"]
break
if dump_path:
with open(dump_path, "rb") as dump_file:
cData = dump_file.read()
buf = re.search(r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3},[\d.,]+)\x00", cData)
if buf:
for c2 in buf.group(1).split(","):
tmp = {"C2": c2}
if tmp not in self.data:
self.data.append(tmp)
if self.payment:
for url in self.payment:
self.data.append({"Payment": url})
return ret
开发者ID:453483289,项目名称:community-modified,代码行数:34,代码来源:locky_apis.py
示例15: getMatches
def getMatches(regex):
# print "getMatches called"
# print os.path
for root, dirs, files in os.walk("./public/rawonions", topdown=False):
for name in files:
# filename = os.path.join("./public/rawonions", name)
filename = os.path.join(root, name)
print "fname = " + filename
# print filename
url = name.replace('+', '/')
try:
with open(filename, 'r') as f:
text = f.read()
# print text
match = re2.search(regex, text)
if not match is None:
# print match.group()
context = text[max(0, (match.span()[0]-width)) : (match.span()[1]+width)]
# print context
# print url
yield (url, match.group(), context)
except re2.RegexError:
print "Had a regex error."
pass
开发者ID:chauncyc,项目名称:regulaTOR,代码行数:25,代码来源:regex.py
示例16: content_matches_topic
def content_matches_topic(content: str, topic: dict, assume_match: bool = False) -> bool:
"""Test whether the content matches the topic['pattern'] regex.
Only check the first megabyte of the string to avoid the occasional very long regex check.
Arguments:
content - text content
topic - topic dict from db
assume_match - assume that the content matches
Return:
True if the content matches the topic pattern
"""
if assume_match:
return True
if content is None:
return False
content = content[0:1024 * 1024]
# for some reason I can't reproduce in dev, in production a small number of fields come from
# the database into the stories fields or the text value produced in the query below in _story_matches_topic
# as bytes objects, which re2.search chokes on
if isinstance(content, bytes):
content = content.decode('utf8', 'backslashreplace')
return re2.search(topic['pattern'], content, re2.I | re2.X | re2.S) is not None
开发者ID:berkmancenter,项目名称:mediacloud,代码行数:29,代码来源:fetch_link.py
示例17: search
def search(self, regex, flags=0, all=False):
if all:
result = dict()
result["detail"] = []
matches = []
for map in self.address_space:
for chunk in map["chunks"]:
self.dumpfile.seek(chunk["offset"])
match = re.finditer(regex, self.dumpfile.read(chunk["end"] - chunk["start"]), flags)
thismatch = []
try:
while True:
m = match.next()
thismatch.append(m)
matches.append(m.group(0))
except StopIteration:
pass
if thismatch:
result["detail"].append({"match": thismatch, "chunk": chunk})
result["matches"] = matches
return result
else:
for map in self.address_space:
for chunk in map["chunks"]:
self.dumpfile.seek(chunk["offset"])
match = re.search(regex, self.dumpfile.read(chunk["end"] - chunk["start"]), flags)
if match:
result = dict()
result["match"] = match
result["chunk"] = chunk
return result
开发者ID:CIRCL,项目名称:cuckoo-modified,代码行数:31,代码来源:objects.py
示例18: render_pony
def render_pony(name, text, balloonstyle, width=80, center=False, centertext=False):
pony = load_pony(name)
balloon = link_l = link_r = ''
if text:
[link_l, link_r] = balloonstyle[-2:]
for i,line in enumerate(pony):
match = re.search('\$balloon([0-9]*)\$', line)
if match:
minwidth = int(match.group(1) or '0')
pony[i:i+1] = render_balloon(text, balloonstyle, minwidth=minwidth, maxwidth=int(width/2), pad=str.center if centertext else str.ljust)
break
try:
first = pony.index('$$$')
second = pony[first+1:].index('$$$')
pony[first:] = pony[first+1+second+1:]
except:
pass
pony = [ line.replace('$\\$', link_l).replace('$/$', link_r) for line in pony ]
indent = ''
if center:
ponywidth = max([ len(re.sub(r'\x1B\[[0-9;]+m|\$.*\$', '', line)) for line in pony ])
indent = ' '*int((width-ponywidth)/2)
wre = re.compile('((\x1B\[[0-9;]+m)*.){0,%s}' % width)
reset = '[39;49m\n'
return indent+(reset+indent).join([ wre.search(line).group() for line in pony ])+reset
开发者ID:jaseg,项目名称:ponysay,代码行数:25,代码来源:__init__.py
示例19: set_param
def set_param(param, value):
"""Set profile/info parameter, add/remove account
"""
value = value.strip()
try:
if re.search(r'[^a-z0-9_\.]', param, re.I):
raise KeyError
#if param in ('passwd', 'password'):
# env.user.set_password(value)
elif param.startswith('info.') and param[5:] in fields['info']:
env.user.set_info(param[5:], value)
elif param in fields['account']:
if value.startswith('-'):
return del_account(param, value[1:].strip())
if value.startswith('+'):
value = value[1:]
return add_account(param, value.strip())
else:
env.user.set_profile(param, value)
env.user.save()
except ValueError, e:
v = e.message if e.message else value
return xmpp_template('profile_value_err', value=v)
开发者ID:radjah,项目名称:point-xmpp,代码行数:29,代码来源:users.py
示例20: on_call
def on_call(self, call, process):
if call["api"] == "RegQueryValueExA":
# There are many more ways to get the computer name, this is the
# pattern observed with all Dridex varients 08/14 - 03/15 so far.
testkey = self.get_argument(call, "FullName").lower()
if testkey == "hkey_local_machine\\system\\controlset001\\control\\computername\\computername\\computername":
buf = self.get_argument(call, "Data")
if buf:
self.compname = buf.lower()
if testkey == "hkey_current_user\\volatile environment\\username":
if call["status"]:
buf = self.get_argument(call, "Data")
if buf:
self.username = buf.lower()
else:
self.is_xp = True
if call["api"] == "CryptHashData":
self.crypted.append(self.get_argument(call, "Buffer").lower())
if call["api"] == "connect":
if not self.extract:
return None
socknum = str(self.get_argument(call, "socket"))
if socknum and socknum not in self.sockmon.keys():
self.sockmon[socknum] = ""
lastip = self.get_argument(call, "ip")
self.sockmon[socknum] = lastip
if call["api"] == "send":
if not self.extract:
return None
socknum = str(self.get_argument(call, "socket"))
if socknum and socknum in self.sockmon.keys():
buf = self.get_argument(call, "buffer")
# POST is a stable indicator observed so far
if buf and buf[:4] == "POST":
self.payloadip["send"] = self.sockmon[socknum]
if call["api"] == "recv":
if not self.extract:
return None
socknum = str(self.get_argument(call, "socket"))
if socknum and socknum in self.sockmon.keys():
buf = self.get_argument(call, "buffer")
if buf:
clen = re.search(r"Content-Length:\s([^\s]+)", buf)
if clen:
length = int(clen.group(1))
if length > 100000:
if "send" in self.payloadip and self.sockmon[socknum] == self.payloadip["send"]:
# Just a sanity check to make sure the IP hasn't changed
# since this is a primitive send/recv monitor
self.payloadip["recv"] = self.sockmon[socknum]
return None
开发者ID:KillerInstinct,项目名称:community-modified,代码行数:60,代码来源:dridex_apis.py
注:本文中的re2.search函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论