• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sync.ModbusTcpClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pymodbus.client.sync.ModbusTcpClient的典型用法代码示例。如果您正苦于以下问题:Python ModbusTcpClient类的具体用法?Python ModbusTcpClient怎么用?Python ModbusTcpClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ModbusTcpClient类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: testTcpClientRegister

 def testTcpClientRegister(self):
     class CustomeRequest:
         function_code = 79
     client = ModbusTcpClient()
     client.framer = Mock()
     client.register(CustomeRequest)
     assert client.framer.decoder.register.called_once_with(CustomeRequest)
开发者ID:bashwork,项目名称:pymodbus,代码行数:7,代码来源:test_client_sync.py


示例2: TestMbTcpClass0

class TestMbTcpClass0(unittest.TestCase):
	read_values = range(0xAA50, 0xAA60)
	write_values = range(0xBB50, 0xBB60)
	
	def setUp(self):
		self.client = ModbusTcpClient(SERVER_HOST)
		self.client.connect()
		
	def tearDown(self):
		self.client.close()
		
	def test_read_holding_registers(self):
		rv = self.client.read_holding_registers(0, 16)
		self.assertEqual(rv.function_code, 0x03)
		self.assertEqual(rv.registers, self.read_values)
		
	def test_read_holding_registers_exception(self):
		rv = self.client.read_holding_registers(16, 1)
		self.assertEqual(rv.function_code, 0x83)
		self.assertEqual(rv.exception_code, 0x02)
	
	def test_write_holding_registers(self):
		rq = self.client.write_registers(0, self.write_values)
		self.assertEqual(rq.function_code, 0x10)
		rr = self.client.read_holding_registers(0, 16)
		self.assertEqual(rr.registers, self.write_values)
		rq = self.client.write_registers(0, self.read_values)
		self.assertEqual(rq.function_code, 0x10)
		
	def test_write_holding_registers_exception(self):
		rq = self.client.write_registers(16, [0x00])
		self.assertEqual(rq.function_code, 0x90)
		self.assertEqual(rq.exception_code, 0x02)
开发者ID:davidmalovrh,项目名称:w7500_modbustcp,代码行数:33,代码来源:mbtcp_class0.py


示例3: runModBus

def runModBus(IOVariables):
    #---------------------------------------------------------------------------#
    # choose the client you want
    #---------------------------------------------------------------------------#
    # make sure to start an implementation to hit against. For this
    # you can use an existing device, the reference implementation in the tools
    # directory, or start a pymodbus server.
    #---------------------------------------------------------------------------#
    client = ModbusTcpClient('192.168.1.9')
    #rq = client.write_registers(2048, [0])
    #rr = client.read_input_registers(000, 1)
    #print (rr.registers)       
    #---------------------------------------------------------------------------#
    # configure io card
    #---------------------------------------------------------------------------#
    #Digital_In_1 = ModbusDigitalInputIOCard(0, client)   
    #print('IOVariables in modbus.py: {IOVariables} '.format(IOVariables=IOVariables))  
    Digital_Out_1 = ModbusDigitalOutputIOCard(2048, client, IOVariables)          
    #---------------------------------------------------------------------------#
    # Run io card
    #---------------------------------------------------------------------------#
    #Digital_In_1.ReadStatus()
    #---------------------------------------------------------------------------#
    # Run io card
    #---------------------------------------------------------------------------#
    Digital_Out_1.WriteStatus()
    #---------------------------------------------------------------------------#
    # close the client
    #---------------------------------------------------------------------------#
    client.close()      
开发者ID:gurkslask,项目名称:Home-automation,代码行数:30,代码来源:ModBus.py


示例4: testTcpClientSend

    def testTcpClientSend(self):
        ''' Test the tcp client send method'''
        client = ModbusTcpClient()
        self.assertRaises(ConnectionException, lambda: client._send(None))

        client.socket = mockSocket()
        self.assertEqual(0, client._send(None))
        self.assertEqual(4, client._send('1234'))
开发者ID:zhanglongqi,项目名称:pymodbus,代码行数:8,代码来源:test_client_sync.py


示例5: testTcpClientRecv

    def testTcpClientRecv(self):
        ''' Test the tcp client receive method'''
        client = ModbusTcpClient()
        self.assertRaises(ConnectionException, lambda: client._recv(1024))

        client.socket = mockSocket()
        self.assertEqual('', client._recv(0))
        self.assertEqual('\x00'*4, client._recv(4))
开发者ID:zhanglongqi,项目名称:pymodbus,代码行数:8,代码来源:test_client_sync.py


示例6: __init__

 def __init__(self, *args, **kwargs):
     ''' Constructor
     
     default modbus port is 502'''
     #ip address
     self.addr = args[0]
     
     ModbusTcpClient.__init__(self, self.addr)
     
     self.connect()
开发者ID:mkopache,项目名称:Heating_control,代码行数:10,代码来源:thermstatctrl.py


示例7: testTcpClientConnect

    def testTcpClientConnect(self):
        ''' Test the tcp client connection method'''
        with patch.object(socket, 'create_connection') as mock_method:
            mock_method.return_value = object()
            client = ModbusTcpClient()
            self.assertTrue(client.connect())

        with patch.object(socket, 'create_connection') as mock_method:
            mock_method.side_effect = socket.error()
            client = ModbusTcpClient()
            self.assertFalse(client.connect())
开发者ID:zhanglongqi,项目名称:pymodbus,代码行数:11,代码来源:test_client_sync.py


示例8: run

 def run(self):
     comm=server_addr[random.randint(0,len(serverlist)-1)]
     client = ModbusTcpClient(comm, self.port, source_address=(self.ipaddr,0), retries=1, retry_on_empty=True)
     while(not self.clientstop.is_set()):
         client.write_coil(1, True)
         print "coil write from:" + self.ipaddr + " --->" + comm 
         time.sleep(random.randint(0,3))
     print "stopping"
     client.socket.shutdown(1)
     client.close()
     return
开发者ID:GridProtectionAlliance,项目名称:ARMORE,代码行数:11,代码来源:modbusgen.py


示例9: create_sunspec_sync_client

def create_sunspec_sync_client(host):
    """ A quick helper method to create a sunspec
    client.

    :param host: The host to connect to
    :returns: an initialized SunspecClient
    """
    modbus = ModbusTcpClient(host)
    modbus.connect()
    client = SunspecClient(modbus)
    client.initialize()
    return client
开发者ID:ccatterina,项目名称:pymodbus,代码行数:12,代码来源:sunspec_client.py


示例10: TcpRtuChannel

class TcpRtuChannel(BaseChannel):
    def __init__(self, network, channel_name, channel_protocol, channel_params, manager, channel_type):
        self.server = channel_params.get("server", "")
        self.port = channel_params.get("port", "")
        self.modbus_client = None
        BaseChannel.__init__(self, network, channel_name, channel_protocol, channel_params, manager, channel_type)

    def run(self):
        self.modbus_client = ModbusTcpClient(host=self.server, port=self.port)
        try:
            self.modbus_client.connect()
            logger.debug("连接服务器成功.")
        except Exception, e:
            logger.error("连接服务器失败,错误信息:%r." % e)
开发者ID:lianwutech,项目名称:plugin_linkworld-discard-,代码行数:14,代码来源:tcp_rtu.py


示例11: __init__

 def __init__(self, indicator_list, coil_on_list, coil_off_list, command_list, coil_list, port,
              method="rtu", timeout=0.1, unit=0x01):
     self.unit = unit
     self.indicator_list = indicator_list
     self.coil_on_list = coil_on_list
     self.coil_off_list = coil_off_list
     self.coil_list = coil_list
     self.command_list = command_list
     self.lock = threading.Lock()
     if method == "rtu":
         self.client = ModbusSerialClient(method=method, port=port, baudrate=19200, stopbits=1, bytesize=8,
                                      timeout=timeout)
     elif method == "tcp":
         self.client = ModbusTcpClient(host=port)
开发者ID:PR-2,项目名称:PINCH,代码行数:14,代码来源:modbus_connection.py


示例12: __init__

class communication:
    def __init__(self):
        self.client = None
        self.lock = threading.Lock()

    def connectToDevice(self, address):
        """Connection to the client - the method takes the IP address (as a string, e.g. '192.168.1.11') as an argument."""
        self.client = ModbusTcpClient(address)

    def disconnectFromDevice(self):
        """Close connection"""
        self.client.close()

    def sendCommand(self, data):
        """Send a command to the Gripper - the method takes a list of uint8 as an argument. The meaning of each variable depends on the Gripper model (see support.robotiq.com for more details)"""
        # make sure data has an even number of elements
        if len(data) % 2 == 1:
            data.append(0)

        # Initiate message as an empty list
        message = []

        # Fill message by combining two bytes in one register
        for i in range(0, len(data) / 2):
            message.append((data[2 * i] << 8) + data[2 * i + 1])

        # To do!: Implement try/except
        with self.lock:
            self.client.write_registers(0, message)

    def getStatus(self, numBytes):
        """Sends a request to read, wait for the response and returns the Gripper status. The method gets the number of bytes to read as an argument"""
        numRegs = int(ceil(numBytes / 2.0))

        # To do!: Implement try/except
        # Get status from the device
        with self.lock:
            response = self.client.read_input_registers(0, numRegs)

        # Instantiate output as an empty list
        output = []

        # Fill the output with the bytes in the appropriate order
        for i in range(0, numRegs):
            output.append((response.getRegister(i) & 0xFF00) >> 8)
            output.append(response.getRegister(i) & 0x00FF)

        # Output the result
        return output
开发者ID:WalkingMachine,项目名称:Short-Circuit,代码行数:49,代码来源:comModbusTcp.py


示例13: run

 def run(self):
     self.modbus_client = ModbusTcpClient(host=self.server, port=self.port)
     try:
         self.modbus_client.connect()
         logger.debug("连接服务器成功.")
     except Exception, e:
         logger.error("连接服务器失败,错误信息:%r." % e)
开发者ID:lianwutech,项目名称:plugin_linkworld-discard-,代码行数:7,代码来源:tcp_rtu.py


示例14: LEDUpdateThread

def LEDUpdateThread(pin, update_interval, e):
	print 'LEDUpdateThread'
	client = ModbusTcpClient(args.ip)
	GPIO.setup(pin, GPIO.OUT)
	while True:
		if not e.isSet():
			result = client.read_coils(0, 1)
			if result is not None:
				pin_status = result.bits[0]
				GPIO.output(pin, pin_status)
				print pin_status
			time.sleep(update_interval)
		else:
			break
	client.close()
	print 'LED thread stoped'
开发者ID:szolotykh,项目名称:modbus-raspberrypi,代码行数:16,代码来源:device.py


示例15: start

  def start(self):
    # Connect the client
    self.client = ModbusTcpClient(self.modbus_ip)
    logger=logging.getLogger("service")
    logger.info("Starting Service: " + self.componentId + " (ModbusIO: " + self.xmlname + ").")
    while (GlobalObjects.running):
     
      if(self.client.connect()):
        # Check if the output variables changed, if needed send the new state
        if (self.state_write_changed):
          self.state_write_changed = False
          try:
            self.client.write_registers(self.modbus_write_adres,self.state_write)
          except (AttributeError, ConnectionException):
            state_read_new=self.state_read_old
            pass
     
        # sleep a while
        time.sleep(self.modbus_period)
          
        # try to read new state of the inputs, if connection fails, continue
        try:
          state_read_new = self.client.read_holding_registers(self.modbus_read_adres,
                                                              self.modbus_read_size).registers
        except (AttributeError, ConnectionException):
          state_read_new=self.state_read_old
          pass

        # Read check every register for changes
        for i in range( len(state_read_new)):
          if (state_read_new[i]!=self.state_read_old[i]):
            # When the change is in a DI
            if (self.modbus_read_type[i]=='D'):
              # Mark changed positions
              changed=state_read_new[i]^self.state_read_old[i]
              for j in range(16):
                if((changed >> j) & 1):
                  # Generate event
                  eventName=self.configuration["input"]["adres_definition"]["adres"][i]["event_" + str(j)]
                  self.generateEvent(eventName,{'value':((state_read_new[i]>>j) & 1)})
                  #print str(time.time()) + " " + str(i) + "." + str(j) + ": " + str((state_read_new[i]>>j) & 1)
              ## Save the new state
              self.state_read_old[i]=state_read_new[i]
            # When the change is in a AI
            elif (self.modbus_read_type[i]=='A'):
              diff = (state_read_new[i]-self.state_read_old[i])
              if ((diff>self.modbus_read_analog_threshold[i]) or (diff < -self.modbus_read_analog_threshold[i])):
                # Generate event
                eventName=self.configuration["input"]["adres_definition"]["adres"][i]["event"]
                value = float(state_read_new[i]) / self.modbus_read_analog_scale[i]
                self.generateEvent(eventName,{'value':value})
                #print str(i) + ": " + str(value)
 
                ## Save new state
                self.state_read_old[i]=state_read_new[i]
           
      else:
         logger.warning("Lost connection to modbus module.")
         self.state_write_changed=True 
         time.sleep(1)
开发者ID:jphermans,项目名称:JellyControl,代码行数:60,代码来源:ModbusIO.py


示例16: connect

 def connect(self):
     """Open Modbus connection
     """
     if not self.connected:
         self.client = ModbusTcpClient("192.168.50.238", port=502)
         self.client.connect()
         self.connected = True
开发者ID:jegger,项目名称:roboflexMS,代码行数:7,代码来源:modbus.py


示例17: on_switchMain_activate

 def on_switchMain_activate(self, switch,gparam):
     if switch.get_active():
         self.client_1 = ModbusClient(manifold_host_1, port=manifold_port_1)
         self.client_p = ModbusClient(pump_host, port=pump_port)
         self.client_1.connect()
         self.client_p.connect()
         self.readDataSetupConfig()
         time.sleep(2)
         print "start connection"
         time_delay = 1 # 1 seconds delay
         self.loop = LoopingCall(f=self.logging_data, a=(self.client_1, builder.get_object("txtAIN1"),builder.get_object("txtAIN2"),builder.get_object("txtAIN1ENG"),builder.get_object("txtAIN2ENG"),builder.get_object("txtAIN12"),builder.get_object("txtAIN22"),builder.get_object("txtAIN1ENG2"),self.client_p,builder.get_object("txtPout"),builder.get_object("txtQout")))
         self.loop.start(time_delay, now=False) # initially delay by time
         builder.get_object("btnOpenFile").set_sensitive(False)
         builder.get_object("btnOff").set_sensitive(False)
         self.btnAnalyze.set_sensitive(False)
         # self.ani = animation.FuncAnimation(self.figure, self.update_plot, interval = 1000)
     else:
         self.loop.stop()
         time.sleep(1)
         self.client_1.close()
         self.client_p.close()
         print "stop connection"
         time.sleep(2)
         builder.get_object("txtFilePath").set_text(export_csv_path)
         builder.get_object("btnOpenFile").set_sensitive(True)
         builder.get_object("btnOff").set_sensitive(True)
         if self.oneLogged:
             self.btnAnalyze.set_sensitive(True)
开发者ID:andreadanzi,项目名称:pymodbus,代码行数:28,代码来源:hdlf_gui_manual.py


示例18: _run_single_client

def _run_single_client(client_id, host, port, units, results, N, delay, warmup, table):
    _log.info('Client %d connecting to %s (%s)', client_id, host, port)
    client = ModbusClient(host, port=port)
    client.connect()

    _log.info('Client %d connected to %s (%s)', client_id, host, port)

    measurements = []
    errors = []
    for i in xrange(N):
        if N >= 1000 and i % (N/10) == 0 and i > 0:
            _log.info('Client %d %.0f%% complete', client_id, 100.0*i/N)
        try:
            m = _make_random_request(client, units, table)
            if i >= warmup or N <= warmup:
                if i == warmup:
                    _log.info('Client %d warmup complete.', client_id)
                measurements.append(m)
        except jem_exceptions.JemException, e:
            errors.append(e)
            _log.warn('Client %d received error response: %s', client_id, e)
        except Exception, e:
            from pymodbus import exceptions as es
            _log.error("Caught other exception: %s" % str(e))
            _log.error("Is instance: %s", isinstance(e, es.ModbusException))
开发者ID:sprily,项目名称:jem-data,代码行数:25,代码来源:performance.py


示例19: connect_modbus

def connect_modbus(modbus_ip, modbus_port):
    try:
        client = ModbusClient(modbus_ip, port=modbus_port)
    except:
        return None
    if client.connect():
        return client
    return None
开发者ID:MichalZG,项目名称:ObsAssistant,代码行数:8,代码来源:asystent.py


示例20: get_point_sync

 def get_point_sync(self, point_name):    
     register = self.point_map[point_name]
     client = SyncModbusClient(self.ip_address, port=self.port)
     try:
         result = register.get_state_sync(client)
     except (ConnectionException, ModbusIOException, ModbusInterfaceException):
         result = None
     finally:
         client.close()
     return result
开发者ID:NREL,项目名称:volttron,代码行数:10,代码来源:modbus.py



注:本文中的pymodbus.client.sync.ModbusTcpClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sync.ModbusUdpClient类代码示例发布时间:2022-05-27
下一篇:
Python sync.ModbusSerialClient类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap