• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python socket.gethostname函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中socket.gethostname函数的典型用法代码示例。如果您正苦于以下问题:Python gethostname函数的具体用法?Python gethostname怎么用?Python gethostname使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了gethostname函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _machine_id

def _machine_id():
    """
    for informational purposes, try to get a machine unique id thing
    """
    if platform.isLinux():
        try:
            # why this? see: http://0pointer.de/blog/projects/ids.html
            with open('/var/lib/dbus/machine-id', 'r') as f:
                return f.read().strip()
        except:
            # Non-dbus using Linux, get a hostname
            return socket.gethostname()

    elif platform.isMacOSX():
        # Get the serial number of the platform
        import plistlib
        plist_data = subprocess.check_output(["ioreg", "-rd1", "-c", "IOPlatformExpertDevice", "-a"])

        if six.PY2:
            # Only API on 2.7
            return plistlib.readPlistFromString(plist_data)[0]["IOPlatformSerialNumber"]
        else:
            # New, non-deprecated 3.4+ API
            return plistlib.loads(plist_data)[0]["IOPlatformSerialNumber"]

    else:
        # Something else, just get a hostname
        return socket.gethostname()
开发者ID:NinjaMSP,项目名称:crossbar,代码行数:28,代码来源:node.py


示例2: rebuild

def rebuild(tokudb, builddir, tokudb_data, cc, cxx, tests):
    info('Building tokudb.')
    if not os.path.exists(builddir):
        os.mkdir(builddir)
    newenv = os.environ
    newenv['CC'] = cc
    newenv['CXX'] = cxx
    r = call(['cmake',
              '-DCMAKE_BUILD_TYPE=Debug',
              '-DUSE_GTAGS=OFF',
              '-DUSE_CTAGS=OFF',
              '-DUSE_ETAGS=OFF',
              '-DUSE_CSCOPE=OFF',
              '-DTOKUDB_DATA=%s' % tokudb_data,
              tokudb],
             env=newenv,
             cwd=builddir)
    if r != 0:
        send_mail(['[email protected]'], 'Stress tests on %s failed to build.' % gethostname(), '')
        error('Building the tests failed.')
        sys.exit(r)
    r = call(['make', '-j8'], cwd=builddir)
    if r != 0:
        send_mail(['[email protected]'], 'Stress tests on %s failed to build.' % gethostname(), '')
        error('Building the tests failed.')
        sys.exit(r)
开发者ID:SunguckLee,项目名称:MariaDB-PageCompression,代码行数:26,代码来源:run.stress-tests.py


示例3: __init__

    def __init__(self, data_set_name, fixed_tags=None, variable_tags=None, data=None):
        self.data_set_name = str(data_set_name)

        # fixed_tags can optionally be provided in a dict upong DS declaration
        if fixed_tags==None:
            self.fixed_tags={'host':gethostname()}
        elif type(fixed_tags) is dict:
            self.fixed_tags = fixed_tags
            self.fixed_tags['host']=gethostname()
        else:
            print "ERROR: if passing fixed_tags, type must be a dict!"
            exit(1)

        # variable_tags can optionally be provided in a dict upong DS declaration
        if variable_tags==None:
            self.variable_tags = []
            if data != None: self.variable_tags.append({})
        elif type(variable_tags) is dict:
            self.variable_tags = [variable_tags]
        else:
            print "ERROR: if passing variable_tags, type must be a dict!"
            exit(1)

        # data can optionally be provided in a dict upon DS declaration
        if data==None:
            self.data = []
            if variable_tags != None: self.data.append({})
        elif type(data) is dict:
            self.data = [data]
        else:
            print "ERROR: if passing data, type must be a dict!"
            exit(1)
开发者ID:thehoff,项目名称:monitoring-project,代码行数:32,代码来源:output_module.py


示例4: no_test_userandaddressinit

    def no_test_userandaddressinit(self):
        server = 'http://demo.opencastproject.org'
        user = 'matterhorn_system_account';
        password = 'CHANGE_ME';
        hostname = 'rubenruaHost'
        address = '8.8.8.8'
        workflow = 'mini-full'
        workflow_parameters = 'uno:uno'
        workflow_parameters_2 = 'uno:uno;dos:dos'

        client = MHHTTPClient(server, user, password)
        self.assertEqual(client.hostname, 'GC-' + socket.gethostname())
        self.assertEqual(client.address, socket.gethostbyname(socket.gethostname()))

        client = MHHTTPClient(server, user, password, hostname)
        self.assertEqual(client.hostname, hostname)
        self.assertEqual(client.address, socket.gethostbyname(socket.gethostname()))

        client = MHHTTPClient(server, user, password, hostname, address)
        self.assertEqual(client.hostname, hostname)
        self.assertEqual(client.address, address)

        client = MHHTTPClient(server, user, password, hostname, address)
        self.assertEqual(client.workflow, 'full')
        self.assertEqual(client.workflow_parameters, {'trimHold': 'true'})

        client = MHHTTPClient(server, user, password, hostname, address, workflow, workflow_parameters)
        self.assertEqual(client.workflow, 'mini-full')
        self.assertEqual(client.workflow_parameters, {'uno': 'uno'})

        client = MHHTTPClient(server, user, password, hostname, address, workflow, workflow_parameters_2)
        self.assertEqual(client.workflow, 'mini-full')
        self.assertEqual(client.workflow_parameters, {'uno': 'uno', 'dos': 'dos'})
开发者ID:hectorcanto,项目名称:Galicaster,代码行数:33,代码来源:mh_client.py


示例5: set_ip

def set_ip(host_ip):
    print ""
    print ""
    sys.stdout.write("SET IP STATIC ADRESS  :"+host_ip[0]+"     ")
    logging.info( "SET IP ADRESS  :"+host_ip[0])
    get_ip=socket.gethostbyname(socket.gethostname())
    if get_ip!=host_ip[0]:
        os.system('netsh interface ipv4 set address name="Ethernet" source=static address='+host_ip[0]+' mask='+host_ip[1]+' gateway='+host_ip[2])
        i=0        
        while get_ip != host_ip[0] :
            ip_t=socket.gethostbyname(socket.gethostname())
            if ip_t!='127.0.0.1':
                get_ip=ip_t
            
            if i==0:
                sys.stdout.write("Processing ") 
            else:
                sys.stdout.write(".")
            time.sleep(1)
            if i < 60: 
                i = i + 1
            else:
                sys.stdout.write( "    WAIT "+str(i)+" SEC STOPPED, IP ADDRESS :"+get_ip)
                break      
        sys.stdout.write("OK, SET STATIC IP ADRESS HOST :"+get_ip)
        logging.info("OK, SET STATIC IP ADRESS HOST :"+get_ip)
    else:
        sys.stdout.write("IP :"+str(host_ip)+" established")
        logging.error("IP :"+str(host_ip)+" established")
开发者ID:sav1978,项目名称:borey-test,代码行数:29,代码来源:set_ip_cam.py


示例6: links

def links(*screen):
    synergyconf = open(os.path.join(os.path.expanduser("~"), ".synergy.conf"), 'a')
    synergyconf.write('section: links\n')
    for y in screen[0]:
        if y is not None: ## If the comboBox is not empty
            subnames.append(y.split('_'))
        else:
            pass
    ### writing the host part ####
    synergyconf.write("\t%s:\n" % gethostname())
    for z in subnames:
            if z[0] == 'host':
                pass
            else:
                synergyconf.write("\t%s = %s\n" % (z[0], z[2]))

    ### writing the client part ###
    for clients in subnames:
        if clients[2] == gethostname():
            pass
        else:
            synergyconf.write("\t%s:\n" % clients[2])
            synergyconf.write("\t\t%s = %s\n" % (clients[1], gethostname()))

    synergyconf.write('end\n')
    synergyconf.close()
开发者ID:dhirajkhatiwada1,项目名称:uludag,代码行数:26,代码来源:createsynergyconf.py


示例7: create_uid_email

def create_uid_email(username=None, hostname=None):
    """Create an email address suitable for a UID on a GnuPG key.

    :param str username: The username portion of an email address.  If None,
                         defaults to the username of the running Python
                         process.

    :param str hostname: The FQDN portion of an email address. If None, the
                         hostname is obtained from gethostname(2).

    :rtype: str
    :returns: A string formatted as <username>@<hostname>.
    """
    if hostname:
        hostname = hostname.replace(' ', '_')
    if not username:
        try: username = os.environ['LOGNAME']
        except KeyError: username = os.environ['USERNAME']

        if not hostname: hostname = gethostname()

        uid = "%[email protected]%s" % (username.replace(' ', '_'), hostname)
    else:
        username = username.replace(' ', '_')
        if (not hostname) and (username.find('@') == 0):
            uid = "%[email protected]%s" % (username, gethostname())
        elif hostname:
            uid = "%[email protected]%s" % (username, hostname)
        else:
            uid = username

    return uid
开发者ID:micahflee,项目名称:python-gnupg,代码行数:32,代码来源:_util.py


示例8: sendCommunicationAlertClear

	def sendCommunicationAlertClear(self):

		if not self.communicationAlertSent:
			return True

		subject = "[alertR] (%s) Communication problems solved" \
			% socket.gethostname()

		message = "The communication problems between the host '%s' " \
			% socket.gethostname() \
			+ "and the server were solved."

		emailHeader = "From: %s\r\nTo: %s\r\nSubject: %s\r\n" \
			% (self.fromAddr, self.toAddr, subject)

		# sending eMail alert to configured smtp server
		logging.info("[%s]: Sending eMail alert to %s." 
			% (self.fileName, self.toAddr))
		try:
			smtpServer = smtplib.SMTP(self.host, self.port)
			smtpServer.sendmail(self.fromAddr, self.toAddr, 
				emailHeader + message)
			smtpServer.quit()
		except Exception as e:
			logging.exception("[%s]: Unable to send eMail alert. " 
				% self.fileName)
			return False

		# clear flag that a communication alert was sent before exiting
		self.communicationAlertSent = False

		return True
开发者ID:catding,项目名称:alertR,代码行数:32,代码来源:smtp.py


示例9: main

def main():
    components = []

    processes, license = read_config()
    pids = find_pid(processes)

    LOG.info("Watching process:" )

    for pid in pids:
        collection = []
        metrics = dict()
        process = psutil.Process(pid.pid)
    
        collection.append(get_cpu_stats(process))
        collection.append(get_net_stats(process))
        collection.append(get_vm_stats(process))
        collection.append(get_io_stats(process))
    
        for metric in collection:
            for data in metric.metrics:
                namespace = ''
                unit = ''
                section = metric.t_class
                name = data.name
    
                if data.namespace:
                    namespace = data.namespace + "/"
    
                if data.unit:
                    unit = "[" + data.unit + "]"
    
                key = "Component/%(section)s/%(namespace)s%(name)s%(unit)s" % locals()
                value = data.metric
                metrics[key] = value

        component_template = {
            "name": "%s-%s-%s" % (str(pid.pid), pid.name(), gethostname()),
            "guid": GUID,
            "duration" : 60,
            "metrics" : metrics       
        }

        components.append(component_template)
    
    
    payload = {
      "agent": {
        "host" : gethostname(),
        "pid" : getpid(),
        "version" : "1.0.0"
      },
      "components": components
    }
    
    LOG.debug("Sending payload\n: %s", payload)
    
    headers = {"X-License-Key": license, "Content-Type":"application/json", "Accept":"application/json"}
    request = requests.post(url=URL, headers=headers, data=json.dumps(payload))
    LOG.debug("Newrelic response code: %s" ,request.status_code)
    print request.text
开发者ID:limitusus,项目名称:newrelic_procstat,代码行数:60,代码来源:procstat.py


示例10: __init__

	def __init__(self, debug=False):
		self.debug  = debug
		if self.debug:
			logLevel = logging.DEBUG
		else:
			logLevel = logging.INFO
		logging.basicConfig(format='%(levelname)s: %(message)s', level=logLevel)

		if not os.path.isdir(self.OUTPUT_DIR):
		 	os.mkdir(self.OUTPUT_DIR)
		if not os.path.isdir(self.UNPROCESSED_DIR):
		 	os.mkdir(self.UNPROCESSED_DIR)
		if not os.path.isdir(self.PROCESSED_DIR):
		 	os.mkdir(self.PROCESSED_DIR)
		if not os.path.isdir(self.PROCESSING_DIR):
		 	os.mkdir(self.PROCESSING_DIR)
		self.db = DB(debug = self.debug)
		self.rsync = rsync( self.debug )

		if socket.gethostname() == 'cet-sif':
			self.OnServer = True
			# self.host = socket.gethostbyname( 'cet-research.colorado.edu' )
			self.host = '128.138.248.205'
		else:
			self.OnServer = False
			self.host = socket.gethostbyname(socket.gethostname())
开发者ID:Capstone-SIF,项目名称:SIFServer,代码行数:26,代码来源:socketServer.py


示例11: sendCommunicationAlert

	def sendCommunicationAlert(self, connectionRetries):

		if self.communicationAlertSent:
			return True

		subject = "[alertR] (%s) Communication problems detected" \
			% socket.gethostname()

		message = "Communication problems detected between the host '%s' " \
			% socket.gethostname() \
			+ "and the server. The host '%s' was not able to establish " \
			% socket.gethostname() \
			+ "a connection after %d retries." \
			% connectionRetries

		emailHeader = "From: %s\r\nTo: %s\r\nSubject: %s\r\n" \
			% (self.fromAddr, self.toAddr, subject)

		# sending eMail alert to configured smtp server
		logging.info("[%s]: Sending eMail alert to %s." 
			% (self.fileName, self.toAddr))
		try:
			smtpServer = smtplib.SMTP(self.host, self.port)
			smtpServer.sendmail(self.fromAddr, self.toAddr, 
				emailHeader + message)
			smtpServer.quit()
		except Exception as e:
			logging.exception("[%s]: Unable to send eMail alert. " 
				% self.fileName)
			return False

		# set flag that a communication alert was sent before exiting
		self.communicationAlertSent = True

		return True
开发者ID:catding,项目名称:alertR,代码行数:35,代码来源:smtp.py


示例12: test_bulkadd

 def test_bulkadd(self):
     app_id = "test_app"
     bulk_add_request = taskqueue_service_pb.TaskQueueBulkAddRequest()
     item = bulk_add_request.add_add_request()
     item.set_app_id(app_id)
     item.set_queue_name("default")
     item.set_task_name("babaganoose")
     item.set_method(taskqueue_service_pb.TaskQueueAddRequest.GET)
     item.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
     item.set_eta_usec(5000000)  # 5 seconds
     host = socket.gethostbyname(socket.gethostname())
     item.set_url("http://" + host + ":64839/queues")
     host = socket.gethostbyname(socket.gethostname())
     req = urllib2.Request("http://" + host + ":64839")
     api_request = remote_api_pb.Request()
     api_request.set_method("BulkAdd")
     api_request.set_service_name("taskqueue")
     api_request.set_request(bulk_add_request.Encode())
     remote_request = api_request.Encode()
     req.add_header("Content-Length", str(len(remote_request)))
     req.add_header("protocolbuffertype", "Request")
     req.add_header("appdata", app_id)
     response = urllib2.urlopen(req, remote_request)
     api_response = response.read()
     api_response = remote_api_pb.Response(api_response)
     bulk_add_response = taskqueue_service_pb.TaskQueueBulkAddResponse(api_response.response())
     print bulk_add_response
     self.assertEquals(response.getcode(), 200)
开发者ID:mackjoner,项目名称:appscale,代码行数:28,代码来源:test_tq_add_delayed_task.py


示例13: test_bulkadd

 def test_bulkadd(self):
   app_id = 'test_app'
   bulk_add_request = taskqueue_service_pb.TaskQueueBulkAddRequest()
   item = bulk_add_request.add_add_request()
   item.set_app_id(app_id)
   item.set_queue_name('default') 
   item.set_task_name('babaganoose')
   item.set_eta_usec(0)
   item.set_method(taskqueue_service_pb.TaskQueueAddRequest.GET)
   item.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
   host = socket.gethostbyname(socket.gethostname())
   item.set_url('http://' + host + ':64839/queues') 
   host = socket.gethostbyname(socket.gethostname())
   req = urllib2.Request('http://' + host + ':64839')
   api_request = remote_api_pb.Request()
   api_request.set_method("BulkAdd")
   api_request.set_service_name("taskqueue")
   api_request.set_request(bulk_add_request.Encode())
   remote_request = api_request.Encode()
   req.add_header('Content-Length', str(len(remote_request)))
   req.add_header('protocolbuffertype', 'Request') 
   req.add_header('appdata', app_id) 
   response = urllib2.urlopen(req, remote_request)
   api_response = response.read()
   api_response = remote_api_pb.Response(api_response) 
   bulk_add_response = taskqueue_service_pb.TaskQueueBulkAddResponse(api_response.response())
   print bulk_add_response
   self.assertEquals(response.getcode(), 200)
开发者ID:GavinHwa,项目名称:appscale,代码行数:28,代码来源:test_tq_add_task.py


示例14: network_details

    def network_details():
        """
        Returns details about the network links
        """
        # Get IPv4 details
        ipv4_addresses = [info[4][0] for info in socket.getaddrinfo(
            socket.gethostname(), None, socket.AF_INET)]

        # Add localhost
        ipv4_addresses.extend(info[4][0] for info in socket.getaddrinfo(
            "localhost", None, socket.AF_INET))

        # Filter addresses
        ipv4_addresses = sorted(set(ipv4_addresses))

        try:
            # Get IPv6 details
            ipv6_addresses = [info[4][0] for info in socket.getaddrinfo(
                socket.gethostname(), None, socket.AF_INET6)]

            # Add localhost
            ipv6_addresses.extend(info[4][0] for info in socket.getaddrinfo(
                "localhost", None, socket.AF_INET6))

            # Filter addresses
            ipv6_addresses = sorted(set(ipv6_addresses))
        except (socket.gaierror, AttributeError):
            # AttributeError: AF_INET6 is missing in some versions of Python
            ipv6_addresses = None

        return {"IPv4": ipv4_addresses, "IPv6": ipv6_addresses,
                "host.name": socket.gethostname(),
                "host.fqdn": socket.getfqdn()}
开发者ID:cotyb,项目名称:ipopo,代码行数:33,代码来源:report.py


示例15: get_ip

def get_ip():
    """Provides an available network interface address, for example
       "192.168.1.3".

       A `NetworkError` exception is raised in case of failure."""
    try:
        try:
            ip = socket.gethostbyname(socket.gethostname())
        except socket.gaierror:  # for Mac OS X
            ip = socket.gethostbyname(socket.gethostname() + ".local")
    except socket.gaierror:
        # sometimes the hostname doesn't resolve to an ip address, in which
        # case this will always fail
        ip = None

    if (ip is None or ip.startswith("127.")) and mozinfo.isLinux:
        interfaces = _get_interface_list()
        for ifconfig in interfaces:
            if ifconfig[0] == 'lo':
                continue
            else:
                return ifconfig[1]

    if ip is None:
        raise NetworkError('Unable to obtain network address')

    return ip
开发者ID:at13,项目名称:mozilla-central,代码行数:27,代码来源:moznetwork.py


示例16: api_publish

    def api_publish(self, end_point = None):
        self._debug['msg_pubs'] += 1
        ctype = bottle.request.headers['content-type']
        json_req = {}
        if ctype == 'application/json':
            data = bottle.request.json
            for service_type, info in data.items():
                json_req['name'] = service_type
                json_req['info'] = info
        elif ctype == 'application/xml':
            data = xmltodict.parse(bottle.request.body.read())
            for service_type, info in data.items():
                json_req['name'] = service_type
                json_req['info'] = dict(info)
        else:
            bottle.abort(400, e)

        sig = end_point or publisher_id(
                bottle.request.environ['REMOTE_ADDR'], json.dumps(json_req))

        # Rx {'name': u'ifmap-server', 'info': {u'ip_addr': u'10.84.7.1',
        # u'port': u'8443'}}
        info = json_req['info']
        service_type = json_req['name']

        entry = self._db_conn.lookup_service(service_type, service_id=sig)
        if not entry:
            entry = {
                'service_type': service_type,
                'service_id': sig,
                'in_use': 0,
                'ts_use': 0,
                'ts_created': int(time.time()),
                'prov_state': 'new',
                'remote': bottle.request.environ.get('REMOTE_ADDR'),
                'sequence': str(int(time.time())) + socket.gethostname(),
            }
        elif 'sequence' not in entry or self.service_expired(entry):
            # handle upgrade or republish after expiry
            entry['sequence'] = str(int(time.time())) + socket.gethostname()

        entry['info'] = info
        entry['admin_state'] = 'up'
        entry['heartbeat'] = int(time.time())

        # insert entry if new or timed out
        self._db_conn.update_service(service_type, sig, entry)

        response = {'cookie': sig + ':' + service_type}
        if ctype != 'application/json':
            response = xmltodict.unparse({'response': response})

        self.syslog('publish service "%s", sid=%s, info=%s'
                    % (service_type, sig, info))

        if not service_type.lower() in self.service_config:
            self.service_config[
                service_type.lower()] = self._args.default_service_opts

        return response
开发者ID:reiaaoyama,项目名称:contrail-controller,代码行数:60,代码来源:disc_server.py


示例17: on_message

    def on_message(self, headers, body):
        # FIXME, don't hardcode keepalive topic name:
        if headers["destination"] == "/topic/keepalive":
            logging.debug("got keepalive message")
            response = {
                "host": socket.gethostname(),
                "script": os.path.basename(__file__),
                "timestamp": datetime.now().isoformat(),
            }
            stomp.send(body=json.dumps(response), destination="/topic/keepalive_response")

            return ()

        debug_msg = {
            "script": os.path.basename(__file__),
            "host": socket.gethostname(),
            "timestamp": datetime.now().isoformat(),
            "message": "received message from %s, before thread" % headers["destination"],
        }
        stomp.send(body=json.dumps(debug_msg), destination="/topic/keepalive_response")

        logging.info("Received stomp message: {message}".format(message=body))
        received_obj = None
        try:
            received_obj = json.loads(body)
        except ValueError as e:
            logging.error("Received invalid JSON: %s." % body)
            return

        handle_builder_event(received_obj)
        logging.info("Destination: %s" % headers.get("destination"))

        # Acknowledge that the message has been processed
        self.message_received = True
开发者ID:Bioconductor,项目名称:spb_history,代码行数:34,代码来源:track_build_completion.py


示例18: test_check_for_setup_error

    def test_check_for_setup_error(self, vgs):
        vg_obj = fake_lvm.FakeBrickLVM('cinder-volumes',
                                       False,
                                       None,
                                       'default')

        configuration = conf.Configuration(fake_opt, 'fake_group')
        lvm_driver = lvm.LVMVolumeDriver(configuration=configuration,
                                         vg_obj=vg_obj, db=db)

        lvm_driver.delete_snapshot = mock.Mock()

        volume = tests_utils.create_volume(self.context,
                                           host=socket.gethostname())
        volume_id = volume['id']

        backup = {}
        backup['volume_id'] = volume_id
        backup['user_id'] = fake.USER_ID
        backup['project_id'] = fake.PROJECT_ID
        backup['host'] = socket.gethostname()
        backup['availability_zone'] = '1'
        backup['display_name'] = 'test_check_for_setup_error'
        backup['display_description'] = 'test_check_for_setup_error'
        backup['container'] = 'fake'
        backup['status'] = fields.BackupStatus.CREATING
        backup['fail_reason'] = ''
        backup['service'] = 'fake'
        backup['parent_id'] = None
        backup['size'] = 5 * 1024 * 1024
        backup['object_count'] = 22
        db.backup_create(self.context, backup)

        lvm_driver.check_for_setup_error()
开发者ID:Nexenta,项目名称:cinder,代码行数:34,代码来源:test_lvm_driver.py


示例19: get_node_name

def get_node_name():
    # Get all the nodes in the cluster
    cmd = 'kubectl --kubeconfig={} get no -o=json'.format(kubeconfig_path)
    cmd = cmd.split()
    deadline = time.time() + 60
    while time.time() < deadline:
        try:
            raw = check_output(cmd)
            break
        except CalledProcessError:
            hookenv.log('Failed to get node name for node %s.'
                        ' Will retry.' % (gethostname()))
            time.sleep(1)
    else:
        msg = 'Failed to get node name for node %s' % gethostname()
        raise GetNodeNameFailed(msg)

    result = json.loads(raw.decode('utf-8'))
    if 'items' in result:
        for node in result['items']:
            if 'status' not in node:
                continue
            if 'addresses' not in node['status']:
                continue

            # find the hostname
            for address in node['status']['addresses']:
                if address['type'] == 'Hostname':
                    if address['address'] == gethostname():
                        return node['metadata']['name']

                    # if we didn't match, just bail to the next node
                    break
    msg = 'Failed to get node name for node %s' % gethostname()
    raise GetNodeNameFailed(msg)
开发者ID:Amitabhkloud9,项目名称:kubernetes,代码行数:35,代码来源:kubernetes_worker.py


示例20: is_local

    def is_local(self, hosts):
        if 0 == len(self.local_names):
            self.local_names.append('localhost')
            self.local_names.append(socket.gethostname().lower());
            try:
                self.local_names.append(socket.gethostbyname_ex(socket.gethostname())[-1])
            except socket.gaierror:
                # TODO Append local IP address to local_names
                pass

        for s in hosts:
            s = s.lower()
            if s.startswith('127.') \
                    or s.startswith('192.168.') \
                    or s.startswith('10.') \
                    or s.startswith('169.254.') \
                    or s in self.local_names:
                print s
                return True

            for h in config.PROXY_HOSTS_ONLY:
                # if PROXY_HOSTS_ONLY is not empty
                # only proxy these hosts
                if s.endswith(h):
                    return False

        if len(config.PROXY_HOSTS_ONLY) > 0:
            return True
        else:
            return False
开发者ID:Libor87,项目名称:XX-Net,代码行数:30,代码来源:proxy_handler.py



注:本文中的socket.gethostname函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python socket.getnameinfo函数代码示例发布时间:2022-05-27
下一篇:
Python socket.gethostbyname_ex函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap