本文整理汇总了Python中pyzabbix.ZabbixAPI类的典型用法代码示例。如果您正苦于以下问题:Python ZabbixAPI类的具体用法?Python ZabbixAPI怎么用?Python ZabbixAPI使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ZabbixAPI类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: check_cluster
def check_cluster(trigger):
zapi = ZabbixAPI(zabbix_site)
zapi.login(zabbix_user,zabbix_pass)
trigger_notok = zapi.trigger.get(monitored=1,selectHosts=1,
output=['hosts'],withLastEventUnacknowledged=1,
search={'description':trigger},filter={"value":1})
trigger_host_list = []
for trigger_item in trigger_notok:
trigger_host = trigger_item['hosts'][0]['hostid']
trigger_host_list.append(int(trigger_host))
trigger_host_msglist = []
trigger_host_list_nondup = list(set(trigger_host_list))
for trigger_host_nondup in trigger_host_list_nondup:
trigger_host_dict = zapi.host.get(hostids=trigger_host_nondup)
trigger_host_name = trigger_host_dict[0]['host']
trigger_host_count = trigger_host_list.count(trigger_host_nondup)
trigger_host_msg = "{0} trigger(s) on {1}".format(trigger_host_count,trigger_host_name)
trigger_host_msglist.append(trigger_host_msg)
if not trigger_host_msglist:
print "OK"
else:
print '; '.join(trigger_host_msglist)
return True
开发者ID:mcsrainbow,项目名称:python-demos,代码行数:28,代码来源:pyzabbix_livestatus_global.py
示例2: process_base
class process_base(object):
def __init__(self):
ZABBIX_SERVER = 'http://192.168.1.203:82'
#ZABBIX_SERVER = 'http://119.79.232.99:82'
self.zapi = ZabbixAPI(ZABBIX_SERVER)
# Login to the Zabbix API
self.zapi.login('Admin', 'zabbix')
self.zabbix_url = ZABBIX_SERVER
def gid_2_group_list(self):
'''
'''
return self.zapi.hostgroup.get(output='extend')
def zabbix_url(self):
# just for view.py intial the zabbix_url
return self.zabbix_url
def clock_2_time(self, clock_time):
'''
'''
return datetime.fromtimestamp(int(clock_time)).strftime('%Y-%m-%d %X')
def clock_2_timedelta(self, clock_time):
'''
'''
time_delta = datetime.fromtimestamp(
int(clock_time)) - datetime.fromtimestamp(0)
delta_day = time_delta.days
delta_hour = time_delta.seconds / 3600
delta_min = time_delta.seconds % 3600 / 60
return '%s days, %s hours, %s minutes' % (delta_day, delta_hour, delta_min)
开发者ID:stefanmonkey,项目名称:zabbix_center,代码行数:34,代码来源:process.py
示例3: client
def client(**kwargs):
global zapi
if not zapi:
zapi = ZabbixAPI(kwargs['url'])
zapi.login(kwargs['username'], kwargs['password'])
print zapi.api_version()
return zapi
开发者ID:meconlin,项目名称:zabbix-api-simple-cli,代码行数:7,代码来源:zabbix_update.py
示例4: main
def main(argv):
cfgfile = "/etc/zabbix_stats.cfg"
try:
opts, args = getopt.getopt(argv, "hc:", ["configfile="])
except getopt.GetoptError:
print 'zabbix_stats.py -c <configfile>'
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print 'zabbix_stats.py -c <configfile>'
sys.exit(2)
elif opt in ("-c", "--configfile"):
cfgfile = arg
global pp
pp = pprint.PrettyPrinter(indent=4)
config = ConfigParser.ConfigParser()
config.read(cfgfile)
global zapi
zapi = ZabbixAPI(config.get('Server', 'zabbixurl'))
zapi.session.verify = False
# Login to the Zabbix API
zapi.login(config.get('Server', 'username'), config.get('Server', 'password'))
while(1):
Alerts()
开发者ID:Fishwaldo,项目名称:LEDMessageBoard,代码行数:28,代码来源:zabbix_stats.py
示例5: main
def main():
# Enter administrator credentials for the Zabbix Web Frontend
username = raw_input('Username: ')
password = getpass()
zapi = ZabbixAPI(zabbix_server, username, password)
zapi.login(username, password)
print "Connected to Zabbix API Version %s" % zapi.api_version()
for h in zapi.host.get(extendoutput=True):
# Make sure the hosts are named according to their FQDN
if h['dns'] != h['host']:
print '%s is actually connecting to %s.' % (h['host'],h['dns'])
# Make sure they are using hostnames to connect rather than IPs
if h['useip'] == '1':
print '%s is using IP instead of hostname. Skipping.' % h['host']
continue
# Set their IP record to match what the DNS system says it should be
try:
lookup = socket.gethostbyaddr(h['dns'])
except socket.gaierror, e:
print h['dns'], e
continue
actual_ip = lookup[2][0]
if actual_ip != h['ip']:
print "%s has the wrong IP: %s. Changing it to: %s" % (h['host'], h['ip'], actual_ip)
update_host_ip(zapi,h['hostid'],actual_ip)
开发者ID:bladealslayer,项目名称:pyzabbix,代码行数:30,代码来源:fix_host_ips.py
示例6: test_host_get
def test_host_get(self):
httpretty.register_uri(
httpretty.POST,
"http://example.com/api_jsonrpc.php",
body=json.dumps({
"jsonrpc": "2.0",
"result": [{"hostid": 1234}],
"id": 0
}),
)
zapi = ZabbixAPI('http://example.com')
zapi.auth = "123"
result = zapi.host.get()
# Check request
self.assertEqual(
httpretty.last_request().body,
json.dumps({
'jsonrpc': '2.0',
'method': 'host.get',
'params': {},
'auth': '123',
'id': 0,
})
)
# Check response
self.assertEqual(result, [{"hostid": 1234}])
开发者ID:gitter-badger,项目名称:pyzabbix,代码行数:29,代码来源:test_api.py
示例7: test_host_delete
def test_host_delete(self):
httpretty.register_uri(
httpretty.POST,
"http://example.com/api_jsonrpc.php",
body=json.dumps({
"jsonrpc": "2.0",
"result": {
"itemids": [
"22982",
"22986"
]
},
"id": 0
}),
)
zapi = ZabbixAPI('http://example.com')
zapi.auth = "123"
result = zapi.host.delete("22982", "22986")
# Check request
self.assertEqual(
httpretty.last_request().body,
json.dumps({
'jsonrpc': '2.0',
'method': 'host.delete',
'params': ["22982", "22986"],
'auth': '123',
'id': 0,
})
)
# Check response
self.assertEqual(set(result["itemids"]), set(["22982", "22986"]))
开发者ID:gitter-badger,项目名称:pyzabbix,代码行数:34,代码来源:test_api.py
示例8: test_login
def test_login(self):
httpretty.register_uri(
httpretty.POST,
"http://example.com/api_jsonrpc.php",
body=json.dumps({
"jsonrpc": "2.0",
"result": "0424bd59b807674191e7d77572075f33",
"id": 0
}),
)
zapi = ZabbixAPI('http://example.com')
zapi.login('mylogin', 'mypass')
# Check request
self.assertEqual(
httpretty.last_request().body,
json.dumps({
'jsonrpc': '2.0',
'method': 'user.login',
'params': {'user': 'mylogin', 'password': 'mypass'},
'id': 0,
})
)
self.assertEqual(
httpretty.last_request().headers['content-type'],
'application/json-rpc'
)
self.assertEqual(
httpretty.last_request().headers['user-agent'],
'python/pyzabbix'
)
# Check response
self.assertEqual(zapi.auth, "0424bd59b807674191e7d77572075f33")
开发者ID:gitter-badger,项目名称:pyzabbix,代码行数:35,代码来源:test_api.py
示例9: ZabbixUtils
class ZabbixUtils(object):
def __init__(self):
s = requests.Session()
s.auth = (CONF.zabbix.username, CONF.zabbix.password)
s.verify = False
self.zapi = ZabbixAPI(CONF.zabbix.url, s)
self.zapi.login(CONF.zabbix.username, CONF.zabbix.password)
def set_graphite_format(self, s):
"""
Removes whitespaces and lowercases the string
"""
return ''.join(s.split()).lower()
def get_hosts_per_hostgroup(self):
"""Returns a dictionary with Zabbix's hostgroup->hosts mapping."""
d = {}
for hostgroup in self.zapi.hostgroup.get(real_hosts=1,
output="extend",
selectHosts="extend"):
name = hostgroup["name"]
hosts = [host["name"] for host in hostgroup["hosts"]]
d[name] = hosts
return d
def get_items_by_host(self, h, filters=[]):
"""Returns a formated list of (metric, value) pairs.
h: Zabbix's name for the host.
filters: a list of keys to search for.
"""
d = {}
for f in filters:
for item in self.zapi.item.get(search={"key_": f},
host=h,
output=["itemid",
"name",
"lastvalue",
"key_"]):
itemid = item.pop("itemid")
d[itemid] = item
l = []
for m in d.values():
l.append([m["key_"], m["lastvalue"]])
l_metric = []
hostgroups = [group["name"] for group in self.zapi.host.get(
search={"name": h},
output="groups",
selectGroups="extend")[0]["groups"]]
h = self.set_graphite_format(h)
for hgroup in hostgroups:
hgroup = self.set_graphite_format(hgroup)
for m in l:
k = self.set_graphite_format(m[0])
l_metric.append([".".join([hgroup, h, k]), m[1]])
return l_metric
开发者ID:orviz,项目名称:z2g,代码行数:60,代码来源:zabbix.py
示例10: initiate_connection
def initiate_connection():
global connection
logger.error('Initiating connection')
login = settings.ZABBIX_USER
passwd = settings.ZABBIX_PASSWORD
connection = ZabbixAPI(server=settings.ZABBIX_URL)
connection.login(login, passwd)
开发者ID:4i60r,项目名称:ralph,代码行数:7,代码来源:zabbix.py
示例11: _connect
def _connect(self):
s = requests.Session()
s.auth = (CONF.zabbix.username, CONF.zabbix.password)
s.verify = False
conn = ZabbixAPI(CONF.zabbix.url, s)
conn.login(CONF.zabbix.username, CONF.zabbix.password)
return conn
开发者ID:orviz,项目名称:delato,代码行数:8,代码来源:zabbix.py
示例12: auth_zabbix
def auth_zabbix(server_ip, username="Admin", password="zabbix"):
logging.debug("Authentication - Zabbix Server")
zapi = ZabbixAPI("http://" + server_ip + "/zabbix")
zapi.login(username, password)
logging.debug("Connected to Zabbix API Version %s" % zapi.api_version())
return zapi
开发者ID:eideo,项目名称:big_data_learning,代码行数:8,代码来源:zabbix_api_test_host.py
示例13: test_get_api_version
def test_get_api_version(self):
zapi = ZabbixAPI(url='http://127.0.0.1',
user='Admin',
password='zabbix')
if os.environ['ZABBIX_VERSION'] == '3':
self.assertEqual(zapi.api_version(), '3.0.1')
elif os.environ['ZABBIX_VERSION'] == '2':
self.assertEqual(zapi.api_version(), '2.4.7')
else:
self.fail('api_version() not tested!')
开发者ID:phantasmzyh,项目名称:py-zabbix,代码行数:10,代码来源:test_Functional_API.py
示例14: get_zabbix_api
def get_zabbix_api():
zapi = None
if zabbix_comment_update:
zapi = ZabbixAPI(zabbix_url)
try:
zapi.login(zabbix_user, zabbix_password)
except Exception as e:
print 'some errors occurs'
print e
return zapi
开发者ID:atoato88,项目名称:register_zabbix_agent,代码行数:10,代码来源:register_agent.py
示例15: ZabbixClinet
class ZabbixClinet(object):
def __init__(self, idc, address, username, password):
self.idc = idc
self.address = address
try:
self.zbx_api = ZabbixAPI(address)
self.zbx_api.login(username, password)
except Exception, error:
# logging.warning(error)
raise Exception
开发者ID:zouyapeng,项目名称:zbxui,代码行数:10,代码来源:views.py
示例16: _get_zabbix
def _get_zabbix(server = None, password = None, user = None):
from pyzabbix import ZabbixAPI
import thoth.zabbix as zabbix
zabbix_server = server if server else os.environ.get('ZABBIX_SERVER', 'http://127.0.0.1')
zabbix_user = user if user else os.environ.get('ZABBIX_USER', 'Admin')
zabbix_password = password if password else os.environ.get('ZABBIX_PASSWORD', 'zabbix')
if zabbix_server and not match('^http[s]*://', zabbix_server):
zabbix_server = 'http://' + zabbix_server
zbx = ZabbixAPI(zabbix_server)
zbx.login(zabbix_user, zabbix_password)
return zabbix.MonitorZabbix(zbx)
开发者ID:hkumarmk,项目名称:thoth,代码行数:11,代码来源:__init__.py
示例17: set_zabbix_handle
def set_zabbix_handle(self, name, host, user, password, ssl_verify):
if not self.zabbix_handles.get(name, False):
zapi = ZabbixAPI(host)
zapi.session.verify = ssl_verify
zapi.login(user, password)
self.Server.create(host=host, token=zapi.auth)
self.zabbix_handles[name] = zapi
ret = zapi
else:
ret = self.zabbix_handles[host]
return ret
开发者ID:teriyakichild,项目名称:zabbixdash,代码行数:12,代码来源:base.py
示例18: sample
def sample(self):
zapi = ZabbixAPI("http://zabbix.tonyrogers.me/zabbix/")
zapi.session.verify = False
zapi.login("Admin", "zabbix")
ret = zapi.item.get(output=['lastvalue'],filter={'host':'zabbix.tonyrogers.me'},search={'key_':'zabbix[wcache,values]'})
self.items.append({'x': self.seedX,
'y': ret[0]['lastvalue']})
self.seedX += 1
if len(self.items) > 10:
self.items.popleft()
return {'points': list(self.items)}
开发者ID:teriyakichild,项目名称:pydashie,代码行数:12,代码来源:example_samplers.py
示例19: disable_on_zabbix
def disable_on_zabbix(instance_name):
zapi = ZabbixAPI(zabbix_api)
zapi.login(zabbix_user, zabbix_pass)
hosts = zapi.host.get(filter={
"name": instance_name
})
if len(hosts) > 0 and 'hostid' in hosts[0]:
hostid = hosts[0]['hostid']
print("Disabling on Zabbix: %s %s" %
(instance_name, hostid))
zapi.host.update(hostid=hostid, status=1)
开发者ID:akomic,项目名称:ec2-event-handler,代码行数:12,代码来源:ec2_term_handler.py
示例20: authentication
def authentication(server_url,user,password):
if server_url and user and password:
ZABBIX_SERVER = server_url
zapi = ZabbixAPI(ZABBIX_SERVER)
try:
# Login to the Zabbix API
zapi.login(user,password)
return zapi
except Exception, e:
print(e)
sys.exit(1)
开发者ID:afareg,项目名称:zabbix-web-scenario,代码行数:12,代码来源:zabbix_web_scenario.py
注:本文中的pyzabbix.ZabbixAPI类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论