本文整理汇总了Python中xlog.warn函数的典型用法代码示例。如果您正苦于以下问题:Python warn函数的具体用法?Python warn怎么用?Python warn使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了warn函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test2
def test2(self):
work_ciphers = ["AES128-SHA"]
for cipher in self.cipher_list:
if cipher in work_ciphers:
continue
else:
work_ciphers.append(cipher)
xlog.debug("%s", cipher)
cipher_suites = (work_ciphers)
openssl_context = SSLConnection.context_builder(ca_certs=g_cacertfile, cipher_suites=cipher_suites)
try:
ssl, _, _ = connect_ssl(self.ip, openssl_context=openssl_context)
server_type = test_server_type(ssl, self.ip)
xlog.debug("%s", server_type)
if "gws" not in server_type:
work_ciphers.remove(cipher)
except Exception as e:
xlog.warn("err:%s", e)
try:
work_ciphers.remove(cipher)
except:
pass
work_str = ""
for cipher in work_ciphers:
work_str += cipher + ":"
xlog.info("work ciphers:%s", work_str)
开发者ID:new-xd,项目名称:XX-Net,代码行数:29,代码来源:check_ip.py
示例2: save
def save(self):
CONFIG_USER_FILENAME = os.path.abspath( os.path.join(root_path, 'data', 'gae_proxy', 'config.ini'))
try:
f = open(CONFIG_USER_FILENAME, 'w')
if self.user_special.appid != "":
f.write("[gae]\n")
f.write("appid = %s\n" % self.user_special.appid)
f.write("password = %s\n\n" % self.user_special.password)
f.write("[proxy]\n")
f.write("enable = %s\n" % self.user_special.proxy_enable)
f.write("type = %s\n" % self.user_special.proxy_type)
f.write("host = %s\n" % self.user_special.proxy_host)
f.write("port = %s\n" % self.user_special.proxy_port)
f.write("user = %s\n" % self.user_special.proxy_user)
f.write("passwd = %s\n\n" % self.user_special.proxy_passwd)
if self.user_special.host_appengine_mode != "gae":
f.write("[hosts]\n")
f.write("appengine.google.com = %s\n" % self.user_special.host_appengine_mode)
f.write("www.google.com = %s\n\n" % self.user_special.host_appengine_mode)
f.write("[google_ip]\n")
if int(self.user_special.auto_adjust_scan_ip_thread_num) != self.DEFAULT_CONFIG.getint('google_ip', 'auto_adjust_scan_ip_thread_num'):
f.write("auto_adjust_scan_ip_thread_num = %d\n\n" % int(self.user_special.auto_adjust_scan_ip_thread_num))
if int(self.user_special.scan_ip_thread_num) != self.DEFAULT_CONFIG.getint('google_ip', 'max_scan_ip_thread_num'):
f.write("max_scan_ip_thread_num = %d\n\n" % int(self.user_special.scan_ip_thread_num))
if int(self.user_special.use_ipv6) != self.DEFAULT_CONFIG.getint('google_ip', 'use_ipv6'):
f.write("use_ipv6 = %d\n\n" % int(self.user_special.use_ipv6))
f.close()
except:
xlog.warn("launcher.config save user config fail:%s", CONFIG_USER_FILENAME)
开发者ID:figo2002,项目名称:XX-Net,代码行数:35,代码来源:web_control.py
示例3: do_POST
def do_POST(self):
try:
refer = self.headers.getheader('Referer')
netloc = urlparse.urlparse(refer).netloc
if not netloc.startswith("127.0.0.1") and not netloc.startswitch("localhost"):
xlog.warn("web control ref:%s refuse", netloc)
return
except:
pass
xlog.debug ('GAEProxy web_control %s %s %s ', self.address_string(), self.command, self.path)
try:
ctype, pdict = cgi.parse_header(self.headers.getheader('content-type'))
if ctype == 'multipart/form-data':
self.postvars = cgi.parse_multipart(self.rfile, pdict)
elif ctype == 'application/x-www-form-urlencoded':
length = int(self.headers.getheader('content-length'))
self.postvars = urlparse.parse_qs(self.rfile.read(length), keep_blank_values=1)
else:
self.postvars = {}
except:
self.postvars = {}
path = urlparse.urlparse(self.path).path
if path == '/deploy':
return self.req_deploy_handler()
elif path == "/config":
return self.req_config_handler()
elif path == "/scan_ip":
return self.req_scan_ip_handler()
elif path.startswith("/importip"):
return self.req_importip_handler()
else:
self.wfile.write(b'HTTP/1.1 404\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n404 Not Found')
xlog.info('%s "%s %s HTTP/1.1" 404 -', self.address_string(), self.command, self.path)
开发者ID:hzg0102,项目名称:XX-Net,代码行数:34,代码来源:web_control.py
示例4: check_win10
def check_win10():
if sys.platform != "win32":
return False
import ctypes
class OSVERSIONINFOEXW(ctypes.Structure):
_fields_ = [
("dwOSVersionInfoSize", ctypes.c_ulong),
("dwMajorVersion", ctypes.c_ulong),
("dwMinorVersion", ctypes.c_ulong),
("dwBuildNumber", ctypes.c_ulong),
("dwPlatformId", ctypes.c_ulong),
("szCSDVersion", ctypes.c_wchar * 128),
("wServicePackMajor", ctypes.c_ushort),
("wServicePackMinor", ctypes.c_ushort),
("wSuiteMask", ctypes.c_ushort),
("wProductType", ctypes.c_byte),
("wReserved", ctypes.c_byte),
]
os_version = OSVERSIONINFOEXW()
os_version.dwOSVersionInfoSize = ctypes.sizeof(os_version)
retcode = ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version))
if retcode != 0:
xlog.warn("Failed to get win32 OS version")
return False
if os_version.dwMajorVersion == 10:
xlog.info("detect Win10, enable connect concurent control.")
return True
return False
开发者ID:reteng,项目名称:XX-Net,代码行数:33,代码来源:connect_control.py
示例5: fetch
def fetch(method, host, path, headers, payload, bufsize=8192):
request_data = "%s %s HTTP/1.1\r\n" % (method, path)
request_data += "".join("%s: %s\r\n" % (k, v) for k, v in headers.items())
request_data += "\r\n"
ssl_sock = https_manager.get_ssl_connection(host)
if not ssl_sock:
return
ssl_sock.send(request_data.encode())
payload_len = len(payload)
start = 0
while start < payload_len:
send_size = min(payload_len - start, 65535)
sended = ssl_sock.send(payload[start : start + send_size])
start += sended
response = httplib.HTTPResponse(ssl_sock, buffering=True)
response.ssl_sock = ssl_sock
try:
orig_timeout = ssl_sock.gettimeout()
ssl_sock.settimeout(90)
response.begin()
ssl_sock.settimeout(orig_timeout)
except httplib.BadStatusLine as e:
xlog.warn("direct_handler.fetch bad status line:%r", e)
response = None
except Exception as e:
xlog.warn("direct_handler.fetch:%r", e)
return response
开发者ID:hzg0102,项目名称:XX-Net,代码行数:31,代码来源:direct_handler.py
示例6: remove_ip_process
def remove_ip_process(self):
try:
while True:
try:
ip_str = self.to_remove_ip_list.get_nowait()
except:
break
result = check_ip.test(ip_str)
if result and result.appspot_ok:
self.add_ip(ip_str, result.handshake_time, result.domain, result.server_type)
xlog.debug("remove ip process, restore ip:%s", ip_str)
continue
if not check_ip.network_is_ok():
self.to_remove_ip_list.put(ip_str)
xlog.warn("network is unreachable. check your network connection.")
return
xlog.info("real remove ip:%s ", ip_str)
self.iplist_need_save = 1
finally:
self.remove_ip_thread_num_lock.acquire()
self.remove_ip_thread_num -= 1
self.remove_ip_thread_num_lock.release()
开发者ID:az0ne,项目名称:XX-Net,代码行数:26,代码来源:google_ip.py
示例7: keep_alive_thread
def keep_alive_thread(self):
while self.keep_alive:
time.sleep(1)
try:
sock_list = self.new_conn_pool.get_need_keep_alive(maxtime=self.keep_alive-3)
for ssl_sock in sock_list:
ssl_sock.close()
sock_list = self.gae_conn_pool.get_need_keep_alive(maxtime=self.keep_alive-3)
for ssl_sock in sock_list:
# only keep little alive link.
# if you have 25 appid, you can keep 5 alive link.
if self.gae_conn_pool.qsize() > max(1, len(appid_manager.working_appid_list)/2):
ssl_sock.close()
continue
#inactive_time = time.time() -ssl_sock.last_use_time
#logging.debug("inactive_time:%d", inactive_time)
if self.head_request(ssl_sock):
self.save_ssl_connection_for_reuse(ssl_sock)
else:
ssl_sock.close()
self.create_more_connection()
except Exception as e:
xlog.warn("keep alive except:%r", e)
开发者ID:az0ne,项目名称:XX-Net,代码行数:26,代码来源:connect_manager.py
示例8: load
def load(self):
ConfigParser.RawConfigParser.OPTCRE = re.compile(r'(?P<option>[^=\s][^=]*)\s*(?P<vi>[=])\s*(?P<value>.*)$')
self.USER_CONFIG = ConfigParser.ConfigParser()
CONFIG_USER_FILENAME = os.path.abspath( os.path.join(root_path, 'data', 'gae_proxy', 'config.ini'))
self.DEFAULT_CONFIG = ConfigParser.ConfigParser()
DEFAULT_CONFIG_FILENAME = os.path.abspath( os.path.join(current_path, 'proxy.ini'))
try:
if os.path.isfile(CONFIG_USER_FILENAME):
self.USER_CONFIG.read(CONFIG_USER_FILENAME)
else:
return
if os.path.isfile(DEFAULT_CONFIG_FILENAME):
self.DEFAULT_CONFIG.read(DEFAULT_CONFIG_FILENAME)
else:
return
try:
self.user_special.appid = self.USER_CONFIG.get('gae', 'appid')
self.user_special.password = self.USER_CONFIG.get('gae', 'password')
except:
pass
try:
self.user_special.host_appengine_mode = self.USER_CONFIG.get('hosts', 'appengine.google.com')
except:
pass
try:
self.user_special.ip_connect_interval = config.CONFIG.getint('google_ip', 'ip_connect_interval')
except:
pass
try:
self.user_special.scan_ip_thread_num = config.CONFIG.getint('google_ip', 'max_scan_ip_thread_num')
except:
self.user_special.scan_ip_thread_num = self.DEFAULT_CONFIG.getint('google_ip', 'max_scan_ip_thread_num')
try:
self.user_special.auto_adjust_scan_ip_thread_num = config.CONFIG.getint('google_ip', 'auto_adjust_scan_ip_thread_num')
except:
pass
try:
self.user_special.use_ipv6 = config.CONFIG.getint('google_ip', 'use_ipv6')
except:
pass
self.user_special.proxy_enable = self.USER_CONFIG.get('proxy', 'enable')
self.user_special.proxy_type = self.USER_CONFIG.get('proxy', 'type')
self.user_special.proxy_host = self.USER_CONFIG.get('proxy', 'host')
self.user_special.proxy_port = self.USER_CONFIG.get('proxy', 'port')
self.user_special.proxy_user = self.USER_CONFIG.get('proxy', 'user')
self.user_special.proxy_passwd = self.USER_CONFIG.get('proxy', 'passwd')
except Exception as e:
xlog.warn("User_config.load except:%s", e)
开发者ID:hzg0102,项目名称:XX-Net,代码行数:60,代码来源:web_control.py
示例9: load_tasks
def load_tasks():
tasks = []
intervals = []
import imp
tasks_path = config.tasks_path
for fileName in os.listdir(tasks_path):
if fileName.lower().endswith('.py'):
xlog.info('start load %s' % fileName)
try:
with open(os.path.join(tasks_path, fileName), 'rb') as fpy:
task = imp.load_source('Auto-tasks_%d_%s' % (len(tasks), fileName), tasks_path, fpy)
name = task.name
interval = task.run_interval
task.init()
tasks.append(task)
intervals.append(interval)
xlog.info('load task %s success.' % name)
except:
xlog.warn('load %s fail.' % fileName)
del imp
return (tasks, intervals)
开发者ID:Cat7373,项目名称:Auto-tasks,代码行数:27,代码来源:start.py
示例10: load_ip_range
def load_ip_range(self):
self.ip_range_map = {}
self.ip_range_list = []
self.ip_range_index = []
self.candidate_amount_ip = 0
content = self.load_range_content()
lines = content.splitlines()
for line in lines:
if len(line) == 0 or line[0] == '#':
continue
try:
begin, end = ip_utils.split_ip(line)
nbegin = ip_utils.ip_string_to_num(begin)
nend = ip_utils.ip_string_to_num(end)
if not nbegin or not nend or nend < nbegin:
xlog.warn("load ip range:%s fail", line)
continue
except Exception as e:
xlog.exception("load ip range:%s fail:%r", line, e)
continue
self.ip_range_map[self.candidate_amount_ip] = [nbegin, nend]
self.ip_range_list.append( [nbegin, nend] )
self.ip_range_index.append(self.candidate_amount_ip)
num = nend - nbegin
self.candidate_amount_ip += num
# print ip_utils.ip_num_to_string(nbegin), ip_utils.ip_num_to_string(nend), num
self.ip_range_index.sort()
开发者ID:sun3596209,项目名称:XX-Net,代码行数:31,代码来源:google_ip_range.py
示例11: test
def test(ip_str, loop=1):
xlog.info("==>%s", ip_str)
check = Check_frame(ip_str, check_cert=False)
for i in range(loop):
result = check.check(callback=test_app_head)
if not result:
if "gws" in check.result.server_type:
xlog.warn("ip:%s server_type:%s but appengine check fail.", ip_str, check.result.server_type)
xlog.warn("check fail")
#continue
else:
xlog.debug("=======app check ok: %s", ip_str)
check.result.appspot_ok = result
result = check.check(callback=test_server_type, check_ca=True)
if not result:
xlog.debug("test server type fail")
continue
check.result.server_type = result
xlog.info("========== %s type:%s domain:%s handshake:%d", ip_str, check.result.server_type,
check.result.domain, check.result.handshake_time)
return check.result
开发者ID:new-xd,项目名称:XX-Net,代码行数:28,代码来源:check_ip.py
示例12: request
def request(headers={}, payload=None):
max_retry = 3
for i in range(max_retry):
ssl_sock = None
try:
ssl_sock = https_manager.get_ssl_connection()
if not ssl_sock:
xlog.debug("create_ssl_connection fail")
continue
if ssl_sock.host == "":
ssl_sock.appid = appid_manager.get_appid()
if not ssl_sock.appid:
raise GAE_Exception(1, "no appid can use")
headers["Host"] = ssl_sock.appid + ".appspot.com"
ssl_sock.host = headers["Host"]
else:
headers["Host"] = ssl_sock.host
response = _request(ssl_sock, headers, payload)
if not response:
ssl_sock.close()
continue
response.ssl_sock = ssl_sock
return response
except Exception as e:
xlog.warn("request failed:%s", e)
if ssl_sock:
ssl_sock.close()
raise GAE_Exception(2, "try max times")
开发者ID:cc419378878,项目名称:XX-Net,代码行数:32,代码来源:gae_handler.py
示例13: load
def load(self):
ConfigParser.RawConfigParser.OPTCRE = re.compile(r"(?P<option>[^=\s][^=]*)\s*(?P<vi>[=])\s*(?P<value>.*)$")
self.DEFAULT_CONFIG = ConfigParser.ConfigParser()
DEFAULT_CONFIG_FILENAME = os.path.abspath(os.path.join(current_path, "proxy.ini"))
self.USER_CONFIG = ConfigParser.ConfigParser()
CONFIG_USER_FILENAME = os.path.abspath(os.path.join(root_path, "data", "gae_proxy", "config.ini"))
try:
if os.path.isfile(DEFAULT_CONFIG_FILENAME):
self.DEFAULT_CONFIG.read(DEFAULT_CONFIG_FILENAME)
self.user_special.scan_ip_thread_num = self.DEFAULT_CONFIG.getint("google_ip", "max_scan_ip_thread_num")
else:
return
if os.path.isfile(CONFIG_USER_FILENAME):
self.USER_CONFIG.read(CONFIG_USER_FILENAME)
else:
return
try:
self.user_special.appid = self.USER_CONFIG.get("gae", "appid")
self.user_special.password = self.USER_CONFIG.get("gae", "password")
except:
pass
try:
self.user_special.host_appengine_mode = self.USER_CONFIG.get("hosts", "appengine.google.com")
except:
pass
try:
self.user_special.scan_ip_thread_num = config.CONFIG.getint("google_ip", "max_scan_ip_thread_num")
except:
self.user_special.scan_ip_thread_num = self.DEFAULT_CONFIG.getint("google_ip", "max_scan_ip_thread_num")
try:
self.user_special.auto_adjust_scan_ip_thread_num = config.CONFIG.getint(
"google_ip", "auto_adjust_scan_ip_thread_num"
)
except:
pass
try:
self.user_special.use_ipv6 = config.CONFIG.getint("google_ip", "use_ipv6")
except:
pass
self.user_special.proxy_enable = self.USER_CONFIG.get("proxy", "enable")
self.user_special.proxy_type = self.USER_CONFIG.get("proxy", "type")
self.user_special.proxy_host = self.USER_CONFIG.get("proxy", "host")
self.user_special.proxy_port = self.USER_CONFIG.get("proxy", "port")
self.user_special.proxy_user = self.USER_CONFIG.get("proxy", "user")
self.user_special.proxy_passwd = self.USER_CONFIG.get("proxy", "passwd")
except Exception as e:
xlog.warn("User_config.load except:%s", e)
开发者ID:sun3596209,项目名称:XX-Net,代码行数:58,代码来源:web_control.py
示例14: request
def request(self, method, host, schema="http", path="/", headers={}, data="", timeout=40):
# change top domain to xx-net.net
# this domain bypass the cloudflare front for ipv4
#p = host.find(".")
#host_sub = host[:p]
#host = host_sub + ".xx-net.net"
schema = "http"
# force schema to http, avoid cert fail on heroku curl.
# and all x-server provide ipv4 access
url = schema + "://" + host + path
payloads = ['%s %s HTTP/1.1\r\n' % (method, url)]
for k in headers:
v = headers[k]
payloads.append('%s: %s\r\n' % (k, v))
head_payload = "".join(payloads)
request_body = '%s%s%s%s' % \
((struct.pack('!H', len(head_payload)), head_payload,
struct.pack('!I', len(data)), data))
request_headers = {'Content-Length': len(data), 'Content-Type': 'application/octet-stream'}
heroku_host = ""
content, status, response = self._request(
"POST", heroku_host, "/2/",
request_headers, request_body, timeout)
# xlog.info('%s "PHP %s %s %s" %s %s', handler.address_string(), handler.command, url, handler.protocol_version, response.status, response.getheader('Content-Length', '-'))
# xlog.debug("status:%d", status)
if status == 200:
xlog.debug("%s %s%s trace:%s", method, host, path, response.task.get_trace())
self.last_success_time = time.time()
self.continue_fail_num = 0
self.success_num += 1
else:
if status == 404:
heroku_host = response.ssl_sock.host
xlog.warn("heroku:%s fail", heroku_host)
try:
self.host_manager.remove(heroku_host)
except:
pass
self.last_fail_time = time.time()
self.continue_fail_num += 1
self.fail_num += 1
try:
res = simple_http_client.TxtResponse(content)
except:
return "", 501, {}
res.worker = response.worker
res.task = response.task
return res.body, res.status, res
开发者ID:chenqiuyan,项目名称:XX-Net,代码行数:56,代码来源:front.py
示例15: runJob
def runJob(self):
while self.check_num < 1000000:
try:
time.sleep(1)
ip_int = ip_range.get_ip()
#ip_int = ip_range.random_get_ip()
ip_str = ip_utils.ip_num_to_string(ip_int)
self.check_ip(ip_str)
except Exception as e:
xlog.warn("google_ip.runJob fail:%s", e)
开发者ID:new-xd,项目名称:XX-Net,代码行数:10,代码来源:check_ip.py
示例16: req_deploy_handler
def req_deploy_handler(self):
global deploy_proc
req = urlparse.urlparse(self.path).query
reqs = urlparse.parse_qs(req, keep_blank_values=True)
data = ""
log_path = os.path.abspath(os.path.join(current_path, os.pardir, "server", "upload.log"))
time_now = datetime.datetime.today().strftime("%H:%M:%S-%a/%d/%b/%Y")
if reqs["cmd"] == ["deploy"]:
appid = self.postvars["appid"][0]
if deploy_proc and deploy_proc.poll() == None:
xlog.warn("deploy is running, request denied.")
data = '{"res":"deploy is running", "time":"%s"}' % (time_now)
else:
try:
if os.path.isfile(log_path):
os.remove(log_path)
script_path = os.path.abspath(os.path.join(current_path, os.pardir, "server", "uploader.py"))
email = self.postvars["email"][0]
passwd = self.postvars["passwd"][0]
rc4_passwd = self.postvars["rc4_passwd"][0]
deploy_proc = subprocess.Popen([sys.executable, script_path, appid, email, passwd, rc4_passwd])
xlog.info("deploy begin.")
data = '{"res":"success", "time":"%s"}' % time_now
except Exception as e:
data = '{"res":"%s", "time":"%s"}' % (e, time_now)
elif reqs["cmd"] == ["cancel"]:
if deploy_proc and deploy_proc.poll() == None:
deploy_proc.kill()
data = '{"res":"deploy is killed", "time":"%s"}' % (time_now)
else:
data = '{"res":"deploy is not running", "time":"%s"}' % (time_now)
elif reqs["cmd"] == ["get_log"]:
if deploy_proc and os.path.isfile(log_path):
with open(log_path, "r") as f:
content = f.read()
else:
content = ""
status = "init"
if deploy_proc:
if deploy_proc.poll() == None:
status = "running"
else:
status = "finished"
data = json.dumps({"status": status, "log": content, "time": time_now})
self.send_response("text/html", data)
开发者ID:raven0823,项目名称:XX-Net,代码行数:55,代码来源:web_control.py
示例17: req_deploy_handler
def req_deploy_handler(self):
global deploy_proc
req = urlparse.urlparse(self.path).query
reqs = urlparse.parse_qs(req, keep_blank_values=True)
data = ''
log_path = os.path.abspath(os.path.join(current_path, os.pardir, "server", 'upload.log'))
time_now = datetime.datetime.today().strftime('%H:%M:%S-%a/%d/%b/%Y')
if reqs['cmd'] == ['deploy']:
appid = self.postvars['appid'][0]
if deploy_proc and deploy_proc.poll() == None:
xlog.warn("deploy is running, request denied.")
data = '{"res":"deploy is running", "time":"%s"}' % (time_now)
else:
try:
if os.path.isfile(log_path):
os.remove(log_path)
script_path = os.path.abspath(os.path.join(current_path, os.pardir, "server", 'uploader.py'))
email = self.postvars['email'][0]
passwd = self.postvars['passwd'][0]
rc4_passwd = self.postvars['rc4_passwd'][0]
deploy_proc = subprocess.Popen([sys.executable, script_path, appid, email, passwd, rc4_passwd])
xlog.info("deploy begin.")
data = '{"res":"success", "time":"%s"}' % time_now
except Exception as e:
data = '{"res":"%s", "time":"%s"}' % (e, time_now)
elif reqs['cmd'] == ['cancel']:
if deploy_proc and deploy_proc.poll() == None:
deploy_proc.kill()
data = '{"res":"deploy is killed", "time":"%s"}' % (time_now)
else:
data = '{"res":"deploy is not running", "time":"%s"}' % (time_now)
elif reqs['cmd'] == ['get_log']:
if deploy_proc and os.path.isfile(log_path):
with open(log_path, "r") as f:
content = f.read()
else:
content = ""
status = 'init'
if deploy_proc:
if deploy_proc.poll() == None:
status = 'running'
else:
status = 'finished'
data = json.dumps({'status':status,'log':content, 'time':time_now})
self.send_response('text/html', data)
开发者ID:hzg0102,项目名称:XX-Net,代码行数:55,代码来源:web_control.py
示例18: check_all_domain
def check_all_domain(check_ip):
with open(os.path.join(current_path, "front_domains.json"), "r") as fd:
content = fd.read()
cs = json.loads(content)
for host in cs:
host = "scan1." + host
res = check_ip.check_ip(ip, host=host, wait_time=wait_time)
if not res or not res.ok:
xlog.warn("host:%s fail", host)
else:
xlog.info("host:%s ok", host)
开发者ID:xiwanglr,项目名称:XX-Net,代码行数:11,代码来源:check_ip.py
示例19: check
def check(self, callback=None, check_ca=True, close_ssl=True):
ssl_sock = None
try:
ssl_sock,self.result.connct_time,self.result.handshake_time = connect_ssl(self.ip, timeout=self.timeout, openssl_context=self.openssl_context)
# verify SSL certificate issuer.
def check_ssl_cert(ssl_sock):
cert = ssl_sock.get_peer_certificate()
if not cert:
#raise HoneypotError(' certficate is none')
raise SSLError("no cert")
issuer_commonname = next((v for k, v in cert.get_issuer().get_components() if k == 'CN'), '')
if self.check_cert and not issuer_commonname.startswith('Google'):
raise HoneypotError(' certficate is issued by %r, not Google' % ( issuer_commonname))
ssl_cert = cert_util.SSLCert(cert)
xlog.info("%s CN:%s", self.ip, ssl_cert.cn)
self.result.domain = ssl_cert.cn
if check_ca:
check_ssl_cert(ssl_sock)
if callback:
return callback(ssl_sock, self.ip)
return True
except HoneypotError as e:
xlog.warn("honeypot %s", self.ip)
raise e
except SSLError as e:
xlog.debug("Check_appengine %s SSLError:%s", self.ip, e)
pass
except IOError as e:
xlog.warn("Check %s IOError:%s", self.ip, e)
pass
except httplib.BadStatusLine:
#logging.debug('Check_appengine http.bad status line ip:%s', ip)
#import traceback
#traceback.print_exc()
pass
except Exception as e:
if len(e.args)>0:
errno_str = e.args[0]
else:
errno_str = e.message
xlog.exception('check_appengine %s %s err:%s', self.ip, errno_str, e)
finally:
if ssl_sock and close_ssl:
ssl_sock.close()
return False
开发者ID:new-xd,项目名称:XX-Net,代码行数:53,代码来源:check_ip.py
示例20: test_gws
def test_gws(ip_str):
xlog.info("==>%s", ip_str)
check = Check_frame(ip_str)
result = check.check(callback=test_server_type, check_ca=True)
if not result or not "gws" in result:
xlog.warn("Server:%s not gws", result)
return False
check.result.server_type = result
return check.result
开发者ID:new-xd,项目名称:XX-Net,代码行数:12,代码来源:check_ip.py
注:本文中的xlog.warn函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论