• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python settings.init函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中settings.init函数的典型用法代码示例。如果您正苦于以下问题:Python init函数的具体用法?Python init怎么用?Python init使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了init函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: start

    def start():
        with KVObjectsManager.__lock:
        # this lock is not really needed here, since the start() method
        # should only be called one time per process.


            if KVObjectsManager._initialized:
                raise RuntimeError("KVObjectsManager already running")


            KVObjectsManager._initialized = True

            settings.init()

            KVObjectsManager._publisher         = Publisher(KVObjectsManager)
            KVObjectsManager._subscriber        = Subscriber(KVObjectsManager)
            KVObjectsManager._sender            = ObjectSender(KVObjectsManager)
            KVObjectsManager._event_processor   = EventProcessor()
            KVObjectsManager._ttl_processor     = TTLProcessor()

            origin_obj = KVObject(collection="origin")

            import socket
            origin_obj.hostname = socket.gethostname()

            origin_obj.notify()
开发者ID:alshayed,项目名称:sapphire-kv,代码行数:26,代码来源:kvobject.py


示例2: main

def main():
    settings.init()
    settings.logger = log.Log(settings.log_file_name)
    settings.logger.daemon = True
    settings.logger.start()
    settings.logger.log('Starting NewsGrabber')

    tools.create_dir(settings.dir_new_urllists)
    tools.create_dir(settings.dir_old_urllists)
    tools.create_dir(settings.dir_donefiles)
    tools.create_dir(settings.dir_ready)
    tools.create_dir(settings.dir_last_upload)

    if not os.path.isfile('rsync_targets'):
        settings.logger.log("Please add one or more rsync targets to file 'rsync_targets'", 'ERROR')
    if not os.path.isfile('rsync_targets_discovery'):
        settings.logger.log("Please add one or more discovery rsync targets to file 'rsync_targets_discovery'", 'ERROR')

    settings.irc_bot = irc.IRC()
    settings.irc_bot.daemon = True
    settings.irc_bot.start()
    settings.upload = upload.Upload()
    settings.upload.daemon = True
    settings.upload.start()
    settings.run_services = service.RunServices()
    settings.run_services.daemon = True
    settings.run_services.start()
    
    while settings.running:
        time.sleep(1)
开发者ID:lucasRolff,项目名称:NewsGrabber,代码行数:30,代码来源:main.py


示例3: __init__

    def __init__(self):
        self.database_dir = ''
        self.storage_dir = ''
        self.shelves = []
        settings.init()

        self.__initialize_configs()
        self.__initialize_areas()
开发者ID:lmingcsce,项目名称:papershelf,代码行数:8,代码来源:papershelf.py


示例4: main

def main():
    '''Start web application'''

    settings.init()

    application = App()
    application.listen(options.port)  # pylint: disable=E1101
    tornado.ioloop.IOLoop.instance().start()
开发者ID:hacder,项目名称:wad,代码行数:8,代码来源:web.py


示例5: update

def update():
    """
    A good way to check all things - load and sign again.
    """
    dhnio.init()
    settings.init()
    src = dhnio.ReadTextFile(settings.LocalIdentityFilename())
    misc.setLocalIdentity(identity(xmlsrc=src))
    misc.getLocalIdentity().sign()
    misc.saveLocalIdentity()
    print misc.getLocalIdentity().serialize()    
开发者ID:vesellov,项目名称:datahaven,代码行数:11,代码来源:identity.py


示例6: main

def main(argv):
    filename = ''
    crossover_methods = []
    max_iterations = 1000
    pop_size = 500
    stop = False

    try:
        opts, args = getopt.getopt(argv, 'hf:i:p:t:s', [
        'help',
        'file=',
        'max-iterations',
        'population',
        'type=',
        's'
    ])
    except getopt.GetoptError:
        usage()
        sys.exit(2)
    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage()
            sys.exit()
        elif opt in ('-f', '--file'):
            filename = arg
            settings.init(filename)
        elif opt in ('-i', '--max-iterations'):
            max_iterations = int(arg)
        elif opt in ('-p', '--population'):
            pop_size = int(arg)
        elif opt in ('-t', '--type'):
            crossover_methods = arg.split(',')
        elif opt in ('-s', '--stop'):
            stop = True

    start_time = time.time()
    for method in crossover_methods:
        P = Population(pop_size)
        for i in range(max_iterations):
            try:
                P.evolve(method)
            except KeyError, e:
                usage()
                sys.exit()
            update_progress(i * 100 / max_iterations, P)
            if stop == True:
                if P.get_best_fitness() <= 0.001: break
        update_progress(100, P)
        print ''
        print 'processing time: {}'.format(time.time() - start_time)
        print ''
        P.result()
开发者ID:etienne-bondot,项目名称:GA-GradientDescentPolynomialRegression,代码行数:52,代码来源:main.py


示例7: main

def main():
    print_lock = Lock()

    socket_lock = Lock()
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    Server.setupSocket(print_lock, server_addr, server_socket)

    rasp_serial = serial.Serial(
        port='/dev/ttyACM0', # Linux
        #port='COM8',           # Windows
        baudrate=9600,
    )

    settings.init()

    threads = []

    update_sensors_thread = Thread(
        target=Server.update_sensors,
        args=(print_lock, rasp_serial)
    )

    threads.append(update_sensors_thread)
    update_sensors_thread.start()

    update_cameras_thread = Thread(
        target=Server.update_camera
    )
    
    threads.append(update_cameras_thread)
    update_cameras_thread.start()

    should_continue = True

    while should_continue:
        with print_lock:
            print "Waiting for new client..."

        client, addr = server_socket.accept()

        thread = Thread(target=Server.run, args=(print_lock, client))
        threads.append(thread)
        thread.start()

    with print_lock:
        print "Joining threads."

    for thread in threads:
        thread.join()

    with print_lock:
        print "All threads closed."
开发者ID:LoganRickert,项目名称:NASA-Robot-Arduino-RPI,代码行数:52,代码来源:server_main.py


示例8: setUp

    def setUp(self):
        self.tickets = mock.create_autospec(GestioTiquets)
        self.tickets.consulta_tiquet_dades.return_value = {
            "solicitant": "usuari.real",
            "emailSolicitant": "[email protected]"
        }
        self.identitat = mock.create_autospec(GestioIdentitat)
        self.identitat.obtenir_uid.return_value = None

        settings.init()
        settings.set("regex_reply", "(.*)")  # Una que trobi sempre algo
        settings.set("regex_privat", "X")   # Una que no trobi mai res
        settings.set("usuari_extern", "usuari.extern")
开发者ID:aaguilera,项目名称:mailtoticket,代码行数:13,代码来源:test_reply.py


示例9: c_future_wealth

def c_future_wealth(fut_period = 1, coh = 1, exo = True):
    c_list = []
    rebate_fut_vals = np.linspace(0, 1, num=11)
    rebate_curr_vals = rebate_fut_vals[1:]
    for rebate_fut in rebate_fut_vals:
        settings.rebate_size = rebate_fut
        settings.init()
        if exo:
            IndShockExample = Model.IndShockConsumerType(**baseline_params)
        else :
            IndShockExample = Model.IndShockConsumerType(**init_natural_borrowing_constraint)
        IndShockExample.solve()
        IndShockExample.unpack_cFunc()
        IndShockExample.timeFwd()
        c_list = np.append(c_list,IndShockExample.cFunc[t_eval-fut_period](coh))
    for rebate_cur in rebate_curr_vals:
        c_list = np.append(c_list,IndShockExample.cFunc[t_eval-fut_period](coh+rebate_cur))
    c_func = LinearInterp(np.linspace(0, 2, num=21),np.array(c_list))             
    return(c_func)
开发者ID:ganong123,项目名称:HARK,代码行数:19,代码来源:test_future_wealth.py


示例10: main

def main():
    if sys.argv.count('receive'):
##        log.startLogging(sys.stdout)
        settings.init()
        settings.update_proxy_settings()
        contacts.init()
        init()
        receive()
        reactor.run()
    elif sys.argv.count('send'):
##        log.startLogging(sys.stdout)
        settings.init()
        settings.update_proxy_settings()
        contacts.init()
        init()
        start_http_server(int(sys.argv[2]))
        send(sys.argv[3], sys.argv[4])
        reactor.run()
    else:
        usage()
开发者ID:vesellov,项目名称:datahaven,代码行数:20,代码来源:transport_http.py


示例11: main

def main(argv=None):
    # instantiate COMM object
    comm = RobotComm(1, -50) #maxRobot = 1, minRSSI = -50
    if comm.start():
        print 'Communication starts'
    else:
        print 'Error: communication'
        return

    # instantiate Robot
    robotList = comm.get_robotList()

    # instantiate global variables gFrame and gBehavior
    settings.init()

    settings.gFrame = tk.Tk()
    settings.gFrame.geometry('600x500')

    gRobotDraw = draw.RobotDraw(settings.gFrame, tk)

    # create behaviors
    settings.gBehaviors[0] = scanning.Behavior("scanning", robotList, 4.0, gRobotDraw.get_queue())
    settings.gBehaviors[1] = collision.Behavior("collision", robotList, -50)

    gRobotDraw.start()
    settings.gFrame.mainloop()
    
    for behavior in behavior_threads:
        print "joining... ", behavior.getName()
        behavior.join()
        print behavior.getName(), "joined!"

    for robot in robotList:
        robot.reset()

    comm.stop()
    comm.join()

    print("terminated!")
开发者ID:crognale,项目名称:cs123,代码行数:39,代码来源:reference_hamster_main.py


示例12: main

def main():
    global _BaseDir
    global _Debug
    options, args = parseCommandLine()
    _BaseDir = options.basedir 
    _Debug = options.debug
    dhnio.init()
    dhnio.SetDebug(18)
    settings.init()
    import identitycache
    identitycache.init()
    reactor.addSystemEventTrigger('before', 'shutdown', shutdown_final)
#    from transport_control import _InitDone
#    _InitDone = True
    def initDone(state, options, args):
        if state != 'online':
            print 'state is %s, exit' % state
            reactor.stop()
            return
        if options.send:
#            def _random_sending(count):
#                import random
#                if count > 10:
#                    return
#                print 'sending file %s to %s' % (args[0], args[1])
#                reactor.callLater(random.randint(0, 2), _random_sending, count+1)
#                send(args[0], args[1])
#                count += 1
#            reactor.callLater(0.5, _random_sending, 0)
            send(args[0], args[1])
            return
        if options.receive:
            print 'state is %s, receiving...' % state
            return
        print 'ONLINE !!!'
    init().addCallback(initDone, options, args)
    # reactor.callLater(20, A, 'shutdown')
    reactor.run() 
开发者ID:vesellov,项目名称:datahaven,代码行数:38,代码来源:transport_cspace.py


示例13: __init__

 def __init__(self):
     handlers = [
         # url(r'/', MainHandler),
     ]
     app_settings = settings.init()
     super().__init__(handlers, **app_settings)
开发者ID:yukirin,项目名称:skel_tornado,代码行数:6,代码来源:main.py


示例14: Eve

#!/usr/bin/python3

from eve import Eve
from eve_docs import eve_docs
from flask_bootstrap import Bootstrap
from utils.auth import APIAuth
from settings import init

# start eve
app = Eve(auth=APIAuth())
init(app)

# eve_docs addon
Bootstrap(app)
app.register_blueprint(eve_docs, url_prefix='/docs')

if __name__ == '__main__':

    # run
    app.run(host='0.0.0.0', port=80)
开发者ID:eozcelik,项目名称:Zotful,代码行数:20,代码来源:run.py


示例15: main

def main():
    sys.path.append('..')

    def _go_stun(port):
        print '+++++ LISTEN UDP ON PORT', port, 'AND RUN STUN DISCOVERY'
        stun.stunExternalIP(close_listener=False, internal_port=port, verbose=False).addBoth(_stuned)

    def _stuned(ip):
        if stun.getUDPClient() is None:
            print 'UDP CLIENT IS NONE - EXIT'
            reactor.stop()
            return

        print '+++++ EXTERNAL UDP ADDRESS IS', stun.getUDPClient().externalAddress
        
        if sys.argv[1] == 'listen':
            print '+++++ START LISTENING'
            return
        
        if sys.argv[1] == 'connect':
            print '+++++ CONNECTING TO REMOTE MACHINE'
            _try2connect()
            return

        lid = misc.getLocalIdentity()
        udp_contact = 'udp://'+stun.getUDPClient().externalAddress[0]+':'+str(stun.getUDPClient().externalAddress[1])
        lid.setProtoContact('udp', udp_contact)
        lid.sign()
        misc.setLocalIdentity(lid)
        misc.saveLocalIdentity()
        
        print '+++++ UPDATE IDENTITY', str(lid.contacts)
        _send_servers().addBoth(_id_sent)

    def _start_sending_ip():
        init(stun.getUDPClient())
        A().debug = True
        reactor.callLater(1, Start)
        def _start_session():
            # transport_udp_session.SetStateChangedCallbackFunc(_state_changed)
            address = (sys.argv[2], int(sys.argv[3]))
            sess = transport_udp_session.open_session(address)
            filename = sys.argv[4]
            loop_delay = None if len(sys.argv)<6 else int(sys.argv[5])
            transport_udp_session._StateChangedCallbackFunc = lambda index, old, new: _state_changed(index, address[0], new, filename, loop_delay)
            sess.automat('init', None)
        reactor.callLater(2, _start_session)        

    def _state_changed(index, ip, newstate, filename, loop_delay):
        print '+++++ STATE CHANGED     [%s]' % newstate
        sess = automat.objects().get(index)
        if newstate == 'CONNECTED':
            transport_udp_session.SetStateChangedCallbackFunc(None)
            if loop_delay:
                reactor.callLater(2, LoopingCall(send, filename, sess.remote_address[0], sess.remote_address[1]).start, loop_delay, True)
            else:
                reactor.callLater(2, send, filename, sess.remote_address[0], sess.remote_address[1])

    def _send_servers():
        import tmpfile, misc, nameurl, settings, transport_tcp
        sendfile, sendfilename = tmpfile.make("propagate")
        os.close(sendfile)
        LocalIdentity = misc.getLocalIdentity()
        dhnio.WriteFile(sendfilename, LocalIdentity.serialize())
        dlist = []
        for idurl in LocalIdentity.sources:
            # sources for out identity are servers we need to send to
            protocol, host, port, filename = nameurl.UrlParse(idurl)
            port = settings.IdentityServerPort()
            d = Deferred()
            transport_tcp.sendsingle(sendfilename, host, port, do_status_report=False, result_defer=d, description='Identity')
            dlist.append(d) 
        dl = DeferredList(dlist, consumeErrors=True)
        print '+++++ IDENTITY SENT TO %s:%s' % (host, port)
        return dl

    def _try2connect():
        remote_addr = dhnio.ReadTextFile(sys.argv[3]).split(' ')
        remote_addr = (remote_addr[0], int(remote_addr[1]))
        t = int(str(int(time.time()))[-1]) + 1
        data = '0' * t
        stun.getUDPClient().transport.write(data, remote_addr)
        print 'sent %d bytes to %s' % (len(data), str(remote_addr))
        reactor.callLater(1, _try2connect)

    def _id_sent(x):
        print '+++++ ID UPDATED ON THE SERVER', x
        if sys.argv[1] == 'send':
            _start_sending()
        elif sys.argv[1] == 'sendip':
            _start_sending_ip()
        elif sys.argv[1] == 'receive':
            _start_receiving()

    def _start_receiving():
        idurl = sys.argv[2]
        if not idurl.startswith('http://'):
            idurl = 'http://'+settings.IdentityServerName()+'/'+idurl+'.xml'
        print '+++++ START RECEIVING FROM', idurl
        _request_remote_id(idurl).addBoth(_receive_from_remote_peer, idurl)
#.........这里部分代码省略.........
开发者ID:vesellov,项目名称:datahaven,代码行数:101,代码来源:transport_udp.py


示例16: types

appendhash = args.appendhash

# whether to dump section-segment mapping
secseg_mapping = args.secmapping

# number of distinct cell types (same branching and compartments)
# each cell has random type [0:ntype]
ntype = int((nring * ncell - 1) / ncell_per_type + 1)

from ring import *
from neuron import h
from commonutils import *
import settings

# initialize global variables
settings.init(usegap, nring)


# note that if branching is small and variation of nbranch and ncompart
# is small then not all types may have distinct topologies
# CoreNEURON will print number of distinct topologies.


h.Random().Random123_globalindex(args.gran)

h.load_file('stdgui.hoc')

pc = h.ParallelContext()

# set number of threads
pc.nthread(args.nt, 1)
开发者ID:nrnhines,项目名称:ringtest,代码行数:31,代码来源:ringtest.py


示例17: main

def main():
    # Initialize the window.
    settings.init()

    director.director.init(width=settings.dimensions['x'], height=settings.dimensions['y'], do_not_scale=True, resizable=True)
    settings.collision_manager = cm.CollisionManagerBruteForce()

    # Create a layer and add a sprite to it.
    settings.layers['game'] = Game()
    settings.layers['bullets'] = BatchNode()
    settings.layers['walls'] = BatchNode()
    settings.layers['enemies'] = BatchNode()

    Tank1 = ETank()
    Tank1.do(UserTankMovingHandlers())

    settings.collision_manager.add(Tank1)

    settings.layers['game'].add(Tank1)
    settings.layers['game'].add(Tank1.getGunSprite())
    settings.objects['players'].append(Tank1)

    settings.layers['game'].add(settings.layers['bullets'])
    settings.layers['game'].add(settings.layers['walls'])
    settings.layers['game'].add(settings.layers['enemies'])

    enemy = KVTank()
    enemy.position = (800, 800)
    enemy.do(EnemyTankMovingHandlers())
    settings.collision_manager.add(enemy)
    settings.objects['enemies'].append(enemy)
    settings.layers['enemies'].add(enemy)
    settings.layers['enemies'].add(enemy.getGunSprite())

    for i in range(20):
        wall = BrickWall()
        wall.update_position(i*32, 500)
        settings.collision_manager.add(wall)
        settings.objects['walls'].append(wall)
        settings.layers['walls'].add(wall)

    for i in range(30):
        wall = BrickWall()
        wall.update_position(i*32 + 680, 500)
        settings.collision_manager.add(wall)
        settings.objects['walls'].append(wall)
        settings.layers['walls'].add(wall)


    for i in range(100):
        for j in range(50):
            wall = sprite.Sprite('sprites/walls/adesert_cracks_5x5.jpg')
            wall.position = (i*5 + 200, 800 + j*5)
            wall.scale = 1
            wall.cshape = cm.AARectShape(
                wall.position,
                wall.width // 2,
                wall.height // 2
            )
            #settings.collision_manager.add(wall)
            #settings.objects['walls'].append(wall)
            #settings.layers['walls'].add(wall)

    #animation = pyglet.image.load_animation('sprites/effects/nuke-ani.gif')
    #anim = sprite.Sprite(animation)
    #anim.position = (500, 500)
    #settings.layers['game'].add(anim)

    # Create a scene and set its initial layer.
    main_scene = scene.Scene(settings.layers['game'])

    # Attach a KeyStateHandler to the keyboard object.
    settings.keyboard = key.KeyStateHandler()
    director.director.window.push_handlers(settings.keyboard)

    # Play the scene in the window.
    director.director.run(main_scene)
开发者ID:andrey-ladygin-loudclear,项目名称:milana,代码行数:77,代码来源:game.py


示例18: main

def main():
  global gRobotList, gQuit
  global gCanvas, frame, gHamsterBox
  global monitor_thread, dispatch_thread
  global gBeepQueue, gWheelQueue, drawQueue
  global vWorld
  global joystick

  comm = RobotComm(gMaxRobotNum)
  comm.start()
  print 'Bluetooth started'

  gRobotList = comm.robotList

  monitor_thread = False
  dispatch_thread= False

  # create UI: two separate Tkinter windows
  # 1. frame = course track display
  # 2. gFrame = localization scanning display
  frame = tk.Tk()
  canvas_width = 700 # half width
  canvas_height = 380 # half height
  gCanvas = tk.Canvas(frame, bg="white", width=canvas_width*2, height=canvas_height*2)
  draw_track()

  settings.init()
  settings.gFrame = tk.Tk()
  settings.gFrame.geometry('600x500')
  gRobotDraw = draw.RobotDraw(settings.gFrame, tk)
  # create scanning behavior
  settings.gBehaviors[0] = scanning.Behavior("scanning", gRobotList, 4.0, gRobotDraw.get_queue())

  gRobotDraw.start()

  # create 2 virtual robot data objects
  vrobot = []
  joystick = []
  keyBindings = []
  for robot_i in range(gMaxRobotNum):
    vrobot.append ( virtual_robot() )
    pi4 = 3.1415 / 4

    # robot starting positions
    vrobot[robot_i].set_robot_a_pos(pi4*2, -520 + robot_i * 40, +340 - robot_i * 80)

    # keyboard input
    if robot_i == 0:
      keyBindings = ['w','s','a','d','x']
    elif robot_i == 1:
      keyBindings = ['i','k','j','l',',']
    joystick.append( Joystick(comm, frame, gCanvas, vrobot[robot_i], robot_i, keyBindings) )
    poly_points = [0,0,0,0,0,0,0,0]
    joystick[robot_i].vrobot.poly_id = gCanvas.create_polygon(poly_points, fill='blue') #robot
    joystick[robot_i].vrobot.prox_l_id = gCanvas.create_line(0,0,0,0, fill="red") #prox sensors  ---- here
    joystick[robot_i].vrobot.prox_r_id = gCanvas.create_line(0,0,0,0, fill="red")
    joystick[robot_i].vrobot.floor_l_id = gCanvas.create_oval(0,0,0,0, outline="white", fill="white") #floor sensors
    joystick[robot_i].vrobot.floor_r_id = gCanvas.create_oval(0,0,0,0, outline="white", fill="white")

    time.sleep(1)

    update_vrobot_thread = threading.Thread(target=joystick[robot_i].update_virtual_robot)
    update_vrobot_thread.daemon = True
    update_vrobot_thread.start()

  # virtual world UI
  drawQueue = Queue.Queue(0)
  vWorld = virtual_world(drawQueue, joystick[0], vrobot[0], gCanvas, canvas_width, canvas_height)
  landmark = [-500, 220, -460, 180]
  vWorld.add_obstacle(landmark)

  draw_world_thread = threading.Thread(target=draw_virtual_world, args=(vWorld,))
  draw_world_thread.daemon = True
  draw_world_thread.start()

  gui = VirtualWorldGui(vWorld, frame)

  gui.drawGrid()
  gui.drawMap()

  gCanvas.after(200, gui.updateCanvas, drawQueue)
  frame.mainloop()

  gQuit = True

  for robot in gRobotList:
    robot.reset()
  time.sleep(1.0)
  comm.stop()
  comm.join()

  print 'Terminated'
开发者ID:crognale,项目名称:cs123,代码行数:92,代码来源:human.py


示例19: set_global_config

    console.setFormatter(formatter)
    logging.getLogger('').addHandler(console)

if __name__ == "__main__":
    configuration = set_global_config()
    set_logging_from_configuration(configuration)
    logger = logging.getLogger(__name__)


    parser = argparse.ArgumentParser(description='Providence Monitor Framework')
    parser.add_argument('--tests','-t', action='store_true')
    parser.add_argument('--mode', help="specify production for production mode, or anything otherwise")
    parser.add_argument('--p4change', help="specify the p4 change number to debug")
    args = parser.parse_args()

    settings.init(args.mode, args.p4change)

    #-- Basic Credentials setup
    credentials_file = configuration.get('credentials_file')
    credential_key = os.environ.get('CREDENTIAL_KEY')
    if credential_key is None:
        credential_key = getpass.getpass('Credential Key:')
    credential_manager = CredentialManager(credentials_file, credential_key)
    config.credential_manager = credential_manager

    ## -- test just resets the db everytime --
    from models import Base
    from db import engine
    if not settings.in_production():
        Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
开发者ID:donaldrivard,项目名称:Providence,代码行数:31,代码来源:providence.py


示例20: main

def main(path):
    """The main function of the program.
    Mode 0 means Layer 3/4
    Mode 1 means Layer 7"""
    
    s.init(path)

    # Set IN/OUT for further operations.
    stdin = os.fdopen(0, 'rb')
    stdout = os.fdopen(1, 'wb')
    alt_stdin = os.fdopen(int(os.getenv("ALTERNATE_STDIN")), 'rb')
    alt_stdout = os.fdopen(int(os.getenv("ALTERNATE_STDOUT")), 'wb')
    # Give TCPConnection the output information
    TCPConnection.stdout = stdout
    TCPConnection.alt_stdout = alt_stdout
     # All calls to inout_map except 0 or 3 give 'special', which means we got
    # socket data.
    #global inout_map
    inout_map = {0: alt_stdout, 3: stdout}
    inout_map = defaultdict(lambda: 'special', inout_map)
    # Register inputs in poller for handling the channels.
    global poller
    poller = select.poll()
    poller.register(stdin, select.POLLIN | select.POLLHUP)
    poller.register(alt_stdin, select.POLLIN | select.POLLHUP)
    # give the TCPConnection class access to the poller and the socket info
    Connection.poller = poller
    if s.mode == 1:
        Connection.sockinfo = (s.config.get('Socket', 'path'))
    
    # Start the sending routine.
    t1 = threading.Thread(target=sendingRoutine)
    t1.daemon = True
    t1.start()
    # Start the receiving routine.
    t2 = threading.Thread(target=receivingRoutine, args=(poller,))
    t2.daemon = True
    t2.start()
    # Start of main routine:
    while 1:
        filedescriptor, readdata = s.receivingQueue.get()
        if s.mode == 0:
            out = inout_map[filedescriptor]
            if filedescriptor == 0:
                filterpacket, status = tcpipfilter_out(readdata)
            elif filedescriptor == 3:
                filterpacket, status = tcpipfilter_in(readdata)
            else:
                raise ValueError("Unkown filedescriptor for filter.")
            if status:
                out.write(filterpacket)
                out.flush()
            s.logfile.flush()
        elif s.mode == 1:
            out = inout_map[filedescriptor]
            if filedescriptor == 0 or filedescriptor == 3:
                ethpacket = decode(readdata)
                ptype = l34protocolfilter(ethpacket)
                if (filedescriptor == 0) and (ptype != Protocol.Other):
                    # Test for filterhits.
                    if l34filter(ethpacket, ptype):
                        # If we have a match on layer34,
                        # proceed with layer 4 management
                        status, con = l4manage(ethpacket,
                                               filedescriptor, ptype)
                        # Send l7 data to external controler via socket.
                        if status == 2:
                            send(Connection.SOCK, con,
                                 getdata(ethpacket), 0)
                            # con.sock.send(getdata(ethpacket))
                        # # Forward data to target.
                        # elif status == 1:
                        #     send(Connection.TAIL, None,
                        #          readdata, 0)
                        # Drop packet
                        else:
                            pass
                    # No filterhit occured.
                    else:
                        # Send data to the other end of the wire.
                        send(Connection.TAIL, None, readdata, 0)
                # ptype == Protocol.Other
                elif (filedescriptor == 0) and (ptype == Protocol.Other):
                    # Send data to the other end of the wire.
                    send(Connection.TAIL, None, readdata, 0)
                # filedescriptor == 3
                elif (filedescriptor == 3) and (ptype == Protocol.TCP):
                    # Test for filterhits.
                    if l34filter(ethpacket, ptype, 1):
                        # Only TCP management is needed here, UDP is not 
                        # monitored on the inward way.
                        status, con = l4TCPmanage(ethpacket,
                                               filedescriptor)
                        # This is the way back,
                        # here we do not send to sock but need to
                        # make sure the correct seq and ack numbers
                        # are set in the packet we foreward.
                        if status == 2:
                            packet = con.makevalid(ethpacket)
                            packet = encode(packet)
#.........这里部分代码省略.........
开发者ID:Jotaro42,项目名称:wire,代码行数:101,代码来源:Wire.py



注:本文中的settings.init函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python settings.lazy_gettext函数代码示例发布时间:2022-05-27
下一篇:
Python settings.getnodes函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap