本文整理汇总了Python中settings.init函数的典型用法代码示例。如果您正苦于以下问题:Python init函数的具体用法?Python init怎么用?Python init使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了init函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: start
def start():
with KVObjectsManager.__lock:
# this lock is not really needed here, since the start() method
# should only be called one time per process.
if KVObjectsManager._initialized:
raise RuntimeError("KVObjectsManager already running")
KVObjectsManager._initialized = True
settings.init()
KVObjectsManager._publisher = Publisher(KVObjectsManager)
KVObjectsManager._subscriber = Subscriber(KVObjectsManager)
KVObjectsManager._sender = ObjectSender(KVObjectsManager)
KVObjectsManager._event_processor = EventProcessor()
KVObjectsManager._ttl_processor = TTLProcessor()
origin_obj = KVObject(collection="origin")
import socket
origin_obj.hostname = socket.gethostname()
origin_obj.notify()
开发者ID:alshayed,项目名称:sapphire-kv,代码行数:26,代码来源:kvobject.py
示例2: main
def main():
settings.init()
settings.logger = log.Log(settings.log_file_name)
settings.logger.daemon = True
settings.logger.start()
settings.logger.log('Starting NewsGrabber')
tools.create_dir(settings.dir_new_urllists)
tools.create_dir(settings.dir_old_urllists)
tools.create_dir(settings.dir_donefiles)
tools.create_dir(settings.dir_ready)
tools.create_dir(settings.dir_last_upload)
if not os.path.isfile('rsync_targets'):
settings.logger.log("Please add one or more rsync targets to file 'rsync_targets'", 'ERROR')
if not os.path.isfile('rsync_targets_discovery'):
settings.logger.log("Please add one or more discovery rsync targets to file 'rsync_targets_discovery'", 'ERROR')
settings.irc_bot = irc.IRC()
settings.irc_bot.daemon = True
settings.irc_bot.start()
settings.upload = upload.Upload()
settings.upload.daemon = True
settings.upload.start()
settings.run_services = service.RunServices()
settings.run_services.daemon = True
settings.run_services.start()
while settings.running:
time.sleep(1)
开发者ID:lucasRolff,项目名称:NewsGrabber,代码行数:30,代码来源:main.py
示例3: __init__
def __init__(self):
self.database_dir = ''
self.storage_dir = ''
self.shelves = []
settings.init()
self.__initialize_configs()
self.__initialize_areas()
开发者ID:lmingcsce,项目名称:papershelf,代码行数:8,代码来源:papershelf.py
示例4: main
def main():
'''Start web application'''
settings.init()
application = App()
application.listen(options.port) # pylint: disable=E1101
tornado.ioloop.IOLoop.instance().start()
开发者ID:hacder,项目名称:wad,代码行数:8,代码来源:web.py
示例5: update
def update():
"""
A good way to check all things - load and sign again.
"""
dhnio.init()
settings.init()
src = dhnio.ReadTextFile(settings.LocalIdentityFilename())
misc.setLocalIdentity(identity(xmlsrc=src))
misc.getLocalIdentity().sign()
misc.saveLocalIdentity()
print misc.getLocalIdentity().serialize()
开发者ID:vesellov,项目名称:datahaven,代码行数:11,代码来源:identity.py
示例6: main
def main(argv):
filename = ''
crossover_methods = []
max_iterations = 1000
pop_size = 500
stop = False
try:
opts, args = getopt.getopt(argv, 'hf:i:p:t:s', [
'help',
'file=',
'max-iterations',
'population',
'type=',
's'
])
except getopt.GetoptError:
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
sys.exit()
elif opt in ('-f', '--file'):
filename = arg
settings.init(filename)
elif opt in ('-i', '--max-iterations'):
max_iterations = int(arg)
elif opt in ('-p', '--population'):
pop_size = int(arg)
elif opt in ('-t', '--type'):
crossover_methods = arg.split(',')
elif opt in ('-s', '--stop'):
stop = True
start_time = time.time()
for method in crossover_methods:
P = Population(pop_size)
for i in range(max_iterations):
try:
P.evolve(method)
except KeyError, e:
usage()
sys.exit()
update_progress(i * 100 / max_iterations, P)
if stop == True:
if P.get_best_fitness() <= 0.001: break
update_progress(100, P)
print ''
print 'processing time: {}'.format(time.time() - start_time)
print ''
P.result()
开发者ID:etienne-bondot,项目名称:GA-GradientDescentPolynomialRegression,代码行数:52,代码来源:main.py
示例7: main
def main():
print_lock = Lock()
socket_lock = Lock()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Server.setupSocket(print_lock, server_addr, server_socket)
rasp_serial = serial.Serial(
port='/dev/ttyACM0', # Linux
#port='COM8', # Windows
baudrate=9600,
)
settings.init()
threads = []
update_sensors_thread = Thread(
target=Server.update_sensors,
args=(print_lock, rasp_serial)
)
threads.append(update_sensors_thread)
update_sensors_thread.start()
update_cameras_thread = Thread(
target=Server.update_camera
)
threads.append(update_cameras_thread)
update_cameras_thread.start()
should_continue = True
while should_continue:
with print_lock:
print "Waiting for new client..."
client, addr = server_socket.accept()
thread = Thread(target=Server.run, args=(print_lock, client))
threads.append(thread)
thread.start()
with print_lock:
print "Joining threads."
for thread in threads:
thread.join()
with print_lock:
print "All threads closed."
开发者ID:LoganRickert,项目名称:NASA-Robot-Arduino-RPI,代码行数:52,代码来源:server_main.py
示例8: setUp
def setUp(self):
self.tickets = mock.create_autospec(GestioTiquets)
self.tickets.consulta_tiquet_dades.return_value = {
"solicitant": "usuari.real",
"emailSolicitant": "[email protected]"
}
self.identitat = mock.create_autospec(GestioIdentitat)
self.identitat.obtenir_uid.return_value = None
settings.init()
settings.set("regex_reply", "(.*)") # Una que trobi sempre algo
settings.set("regex_privat", "X") # Una que no trobi mai res
settings.set("usuari_extern", "usuari.extern")
开发者ID:aaguilera,项目名称:mailtoticket,代码行数:13,代码来源:test_reply.py
示例9: c_future_wealth
def c_future_wealth(fut_period = 1, coh = 1, exo = True):
c_list = []
rebate_fut_vals = np.linspace(0, 1, num=11)
rebate_curr_vals = rebate_fut_vals[1:]
for rebate_fut in rebate_fut_vals:
settings.rebate_size = rebate_fut
settings.init()
if exo:
IndShockExample = Model.IndShockConsumerType(**baseline_params)
else :
IndShockExample = Model.IndShockConsumerType(**init_natural_borrowing_constraint)
IndShockExample.solve()
IndShockExample.unpack_cFunc()
IndShockExample.timeFwd()
c_list = np.append(c_list,IndShockExample.cFunc[t_eval-fut_period](coh))
for rebate_cur in rebate_curr_vals:
c_list = np.append(c_list,IndShockExample.cFunc[t_eval-fut_period](coh+rebate_cur))
c_func = LinearInterp(np.linspace(0, 2, num=21),np.array(c_list))
return(c_func)
开发者ID:ganong123,项目名称:HARK,代码行数:19,代码来源:test_future_wealth.py
示例10: main
def main():
if sys.argv.count('receive'):
## log.startLogging(sys.stdout)
settings.init()
settings.update_proxy_settings()
contacts.init()
init()
receive()
reactor.run()
elif sys.argv.count('send'):
## log.startLogging(sys.stdout)
settings.init()
settings.update_proxy_settings()
contacts.init()
init()
start_http_server(int(sys.argv[2]))
send(sys.argv[3], sys.argv[4])
reactor.run()
else:
usage()
开发者ID:vesellov,项目名称:datahaven,代码行数:20,代码来源:transport_http.py
示例11: main
def main(argv=None):
# instantiate COMM object
comm = RobotComm(1, -50) #maxRobot = 1, minRSSI = -50
if comm.start():
print 'Communication starts'
else:
print 'Error: communication'
return
# instantiate Robot
robotList = comm.get_robotList()
# instantiate global variables gFrame and gBehavior
settings.init()
settings.gFrame = tk.Tk()
settings.gFrame.geometry('600x500')
gRobotDraw = draw.RobotDraw(settings.gFrame, tk)
# create behaviors
settings.gBehaviors[0] = scanning.Behavior("scanning", robotList, 4.0, gRobotDraw.get_queue())
settings.gBehaviors[1] = collision.Behavior("collision", robotList, -50)
gRobotDraw.start()
settings.gFrame.mainloop()
for behavior in behavior_threads:
print "joining... ", behavior.getName()
behavior.join()
print behavior.getName(), "joined!"
for robot in robotList:
robot.reset()
comm.stop()
comm.join()
print("terminated!")
开发者ID:crognale,项目名称:cs123,代码行数:39,代码来源:reference_hamster_main.py
示例12: main
def main():
global _BaseDir
global _Debug
options, args = parseCommandLine()
_BaseDir = options.basedir
_Debug = options.debug
dhnio.init()
dhnio.SetDebug(18)
settings.init()
import identitycache
identitycache.init()
reactor.addSystemEventTrigger('before', 'shutdown', shutdown_final)
# from transport_control import _InitDone
# _InitDone = True
def initDone(state, options, args):
if state != 'online':
print 'state is %s, exit' % state
reactor.stop()
return
if options.send:
# def _random_sending(count):
# import random
# if count > 10:
# return
# print 'sending file %s to %s' % (args[0], args[1])
# reactor.callLater(random.randint(0, 2), _random_sending, count+1)
# send(args[0], args[1])
# count += 1
# reactor.callLater(0.5, _random_sending, 0)
send(args[0], args[1])
return
if options.receive:
print 'state is %s, receiving...' % state
return
print 'ONLINE !!!'
init().addCallback(initDone, options, args)
# reactor.callLater(20, A, 'shutdown')
reactor.run()
开发者ID:vesellov,项目名称:datahaven,代码行数:38,代码来源:transport_cspace.py
示例13: __init__
def __init__(self):
handlers = [
# url(r'/', MainHandler),
]
app_settings = settings.init()
super().__init__(handlers, **app_settings)
开发者ID:yukirin,项目名称:skel_tornado,代码行数:6,代码来源:main.py
示例14: Eve
#!/usr/bin/python3
from eve import Eve
from eve_docs import eve_docs
from flask_bootstrap import Bootstrap
from utils.auth import APIAuth
from settings import init
# start eve
app = Eve(auth=APIAuth())
init(app)
# eve_docs addon
Bootstrap(app)
app.register_blueprint(eve_docs, url_prefix='/docs')
if __name__ == '__main__':
# run
app.run(host='0.0.0.0', port=80)
开发者ID:eozcelik,项目名称:Zotful,代码行数:20,代码来源:run.py
示例15: main
def main():
sys.path.append('..')
def _go_stun(port):
print '+++++ LISTEN UDP ON PORT', port, 'AND RUN STUN DISCOVERY'
stun.stunExternalIP(close_listener=False, internal_port=port, verbose=False).addBoth(_stuned)
def _stuned(ip):
if stun.getUDPClient() is None:
print 'UDP CLIENT IS NONE - EXIT'
reactor.stop()
return
print '+++++ EXTERNAL UDP ADDRESS IS', stun.getUDPClient().externalAddress
if sys.argv[1] == 'listen':
print '+++++ START LISTENING'
return
if sys.argv[1] == 'connect':
print '+++++ CONNECTING TO REMOTE MACHINE'
_try2connect()
return
lid = misc.getLocalIdentity()
udp_contact = 'udp://'+stun.getUDPClient().externalAddress[0]+':'+str(stun.getUDPClient().externalAddress[1])
lid.setProtoContact('udp', udp_contact)
lid.sign()
misc.setLocalIdentity(lid)
misc.saveLocalIdentity()
print '+++++ UPDATE IDENTITY', str(lid.contacts)
_send_servers().addBoth(_id_sent)
def _start_sending_ip():
init(stun.getUDPClient())
A().debug = True
reactor.callLater(1, Start)
def _start_session():
# transport_udp_session.SetStateChangedCallbackFunc(_state_changed)
address = (sys.argv[2], int(sys.argv[3]))
sess = transport_udp_session.open_session(address)
filename = sys.argv[4]
loop_delay = None if len(sys.argv)<6 else int(sys.argv[5])
transport_udp_session._StateChangedCallbackFunc = lambda index, old, new: _state_changed(index, address[0], new, filename, loop_delay)
sess.automat('init', None)
reactor.callLater(2, _start_session)
def _state_changed(index, ip, newstate, filename, loop_delay):
print '+++++ STATE CHANGED [%s]' % newstate
sess = automat.objects().get(index)
if newstate == 'CONNECTED':
transport_udp_session.SetStateChangedCallbackFunc(None)
if loop_delay:
reactor.callLater(2, LoopingCall(send, filename, sess.remote_address[0], sess.remote_address[1]).start, loop_delay, True)
else:
reactor.callLater(2, send, filename, sess.remote_address[0], sess.remote_address[1])
def _send_servers():
import tmpfile, misc, nameurl, settings, transport_tcp
sendfile, sendfilename = tmpfile.make("propagate")
os.close(sendfile)
LocalIdentity = misc.getLocalIdentity()
dhnio.WriteFile(sendfilename, LocalIdentity.serialize())
dlist = []
for idurl in LocalIdentity.sources:
# sources for out identity are servers we need to send to
protocol, host, port, filename = nameurl.UrlParse(idurl)
port = settings.IdentityServerPort()
d = Deferred()
transport_tcp.sendsingle(sendfilename, host, port, do_status_report=False, result_defer=d, description='Identity')
dlist.append(d)
dl = DeferredList(dlist, consumeErrors=True)
print '+++++ IDENTITY SENT TO %s:%s' % (host, port)
return dl
def _try2connect():
remote_addr = dhnio.ReadTextFile(sys.argv[3]).split(' ')
remote_addr = (remote_addr[0], int(remote_addr[1]))
t = int(str(int(time.time()))[-1]) + 1
data = '0' * t
stun.getUDPClient().transport.write(data, remote_addr)
print 'sent %d bytes to %s' % (len(data), str(remote_addr))
reactor.callLater(1, _try2connect)
def _id_sent(x):
print '+++++ ID UPDATED ON THE SERVER', x
if sys.argv[1] == 'send':
_start_sending()
elif sys.argv[1] == 'sendip':
_start_sending_ip()
elif sys.argv[1] == 'receive':
_start_receiving()
def _start_receiving():
idurl = sys.argv[2]
if not idurl.startswith('http://'):
idurl = 'http://'+settings.IdentityServerName()+'/'+idurl+'.xml'
print '+++++ START RECEIVING FROM', idurl
_request_remote_id(idurl).addBoth(_receive_from_remote_peer, idurl)
#.........这里部分代码省略.........
开发者ID:vesellov,项目名称:datahaven,代码行数:101,代码来源:transport_udp.py
示例16: types
appendhash = args.appendhash
# whether to dump section-segment mapping
secseg_mapping = args.secmapping
# number of distinct cell types (same branching and compartments)
# each cell has random type [0:ntype]
ntype = int((nring * ncell - 1) / ncell_per_type + 1)
from ring import *
from neuron import h
from commonutils import *
import settings
# initialize global variables
settings.init(usegap, nring)
# note that if branching is small and variation of nbranch and ncompart
# is small then not all types may have distinct topologies
# CoreNEURON will print number of distinct topologies.
h.Random().Random123_globalindex(args.gran)
h.load_file('stdgui.hoc')
pc = h.ParallelContext()
# set number of threads
pc.nthread(args.nt, 1)
开发者ID:nrnhines,项目名称:ringtest,代码行数:31,代码来源:ringtest.py
示例17: main
def main():
# Initialize the window.
settings.init()
director.director.init(width=settings.dimensions['x'], height=settings.dimensions['y'], do_not_scale=True, resizable=True)
settings.collision_manager = cm.CollisionManagerBruteForce()
# Create a layer and add a sprite to it.
settings.layers['game'] = Game()
settings.layers['bullets'] = BatchNode()
settings.layers['walls'] = BatchNode()
settings.layers['enemies'] = BatchNode()
Tank1 = ETank()
Tank1.do(UserTankMovingHandlers())
settings.collision_manager.add(Tank1)
settings.layers['game'].add(Tank1)
settings.layers['game'].add(Tank1.getGunSprite())
settings.objects['players'].append(Tank1)
settings.layers['game'].add(settings.layers['bullets'])
settings.layers['game'].add(settings.layers['walls'])
settings.layers['game'].add(settings.layers['enemies'])
enemy = KVTank()
enemy.position = (800, 800)
enemy.do(EnemyTankMovingHandlers())
settings.collision_manager.add(enemy)
settings.objects['enemies'].append(enemy)
settings.layers['enemies'].add(enemy)
settings.layers['enemies'].add(enemy.getGunSprite())
for i in range(20):
wall = BrickWall()
wall.update_position(i*32, 500)
settings.collision_manager.add(wall)
settings.objects['walls'].append(wall)
settings.layers['walls'].add(wall)
for i in range(30):
wall = BrickWall()
wall.update_position(i*32 + 680, 500)
settings.collision_manager.add(wall)
settings.objects['walls'].append(wall)
settings.layers['walls'].add(wall)
for i in range(100):
for j in range(50):
wall = sprite.Sprite('sprites/walls/adesert_cracks_5x5.jpg')
wall.position = (i*5 + 200, 800 + j*5)
wall.scale = 1
wall.cshape = cm.AARectShape(
wall.position,
wall.width // 2,
wall.height // 2
)
#settings.collision_manager.add(wall)
#settings.objects['walls'].append(wall)
#settings.layers['walls'].add(wall)
#animation = pyglet.image.load_animation('sprites/effects/nuke-ani.gif')
#anim = sprite.Sprite(animation)
#anim.position = (500, 500)
#settings.layers['game'].add(anim)
# Create a scene and set its initial layer.
main_scene = scene.Scene(settings.layers['game'])
# Attach a KeyStateHandler to the keyboard object.
settings.keyboard = key.KeyStateHandler()
director.director.window.push_handlers(settings.keyboard)
# Play the scene in the window.
director.director.run(main_scene)
开发者ID:andrey-ladygin-loudclear,项目名称:milana,代码行数:77,代码来源:game.py
示例18: main
def main():
global gRobotList, gQuit
global gCanvas, frame, gHamsterBox
global monitor_thread, dispatch_thread
global gBeepQueue, gWheelQueue, drawQueue
global vWorld
global joystick
comm = RobotComm(gMaxRobotNum)
comm.start()
print 'Bluetooth started'
gRobotList = comm.robotList
monitor_thread = False
dispatch_thread= False
# create UI: two separate Tkinter windows
# 1. frame = course track display
# 2. gFrame = localization scanning display
frame = tk.Tk()
canvas_width = 700 # half width
canvas_height = 380 # half height
gCanvas = tk.Canvas(frame, bg="white", width=canvas_width*2, height=canvas_height*2)
draw_track()
settings.init()
settings.gFrame = tk.Tk()
settings.gFrame.geometry('600x500')
gRobotDraw = draw.RobotDraw(settings.gFrame, tk)
# create scanning behavior
settings.gBehaviors[0] = scanning.Behavior("scanning", gRobotList, 4.0, gRobotDraw.get_queue())
gRobotDraw.start()
# create 2 virtual robot data objects
vrobot = []
joystick = []
keyBindings = []
for robot_i in range(gMaxRobotNum):
vrobot.append ( virtual_robot() )
pi4 = 3.1415 / 4
# robot starting positions
vrobot[robot_i].set_robot_a_pos(pi4*2, -520 + robot_i * 40, +340 - robot_i * 80)
# keyboard input
if robot_i == 0:
keyBindings = ['w','s','a','d','x']
elif robot_i == 1:
keyBindings = ['i','k','j','l',',']
joystick.append( Joystick(comm, frame, gCanvas, vrobot[robot_i], robot_i, keyBindings) )
poly_points = [0,0,0,0,0,0,0,0]
joystick[robot_i].vrobot.poly_id = gCanvas.create_polygon(poly_points, fill='blue') #robot
joystick[robot_i].vrobot.prox_l_id = gCanvas.create_line(0,0,0,0, fill="red") #prox sensors ---- here
joystick[robot_i].vrobot.prox_r_id = gCanvas.create_line(0,0,0,0, fill="red")
joystick[robot_i].vrobot.floor_l_id = gCanvas.create_oval(0,0,0,0, outline="white", fill="white") #floor sensors
joystick[robot_i].vrobot.floor_r_id = gCanvas.create_oval(0,0,0,0, outline="white", fill="white")
time.sleep(1)
update_vrobot_thread = threading.Thread(target=joystick[robot_i].update_virtual_robot)
update_vrobot_thread.daemon = True
update_vrobot_thread.start()
# virtual world UI
drawQueue = Queue.Queue(0)
vWorld = virtual_world(drawQueue, joystick[0], vrobot[0], gCanvas, canvas_width, canvas_height)
landmark = [-500, 220, -460, 180]
vWorld.add_obstacle(landmark)
draw_world_thread = threading.Thread(target=draw_virtual_world, args=(vWorld,))
draw_world_thread.daemon = True
draw_world_thread.start()
gui = VirtualWorldGui(vWorld, frame)
gui.drawGrid()
gui.drawMap()
gCanvas.after(200, gui.updateCanvas, drawQueue)
frame.mainloop()
gQuit = True
for robot in gRobotList:
robot.reset()
time.sleep(1.0)
comm.stop()
comm.join()
print 'Terminated'
开发者ID:crognale,项目名称:cs123,代码行数:92,代码来源:human.py
示例19: set_global_config
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
if __name__ == "__main__":
configuration = set_global_config()
set_logging_from_configuration(configuration)
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description='Providence Monitor Framework')
parser.add_argument('--tests','-t', action='store_true')
parser.add_argument('--mode', help="specify production for production mode, or anything otherwise")
parser.add_argument('--p4change', help="specify the p4 change number to debug")
args = parser.parse_args()
settings.init(args.mode, args.p4change)
#-- Basic Credentials setup
credentials_file = configuration.get('credentials_file')
credential_key = os.environ.get('CREDENTIAL_KEY')
if credential_key is None:
credential_key = getpass.getpass('Credential Key:')
credential_manager = CredentialManager(credentials_file, credential_key)
config.credential_manager = credential_manager
## -- test just resets the db everytime --
from models import Base
from db import engine
if not settings.in_production():
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
开发者ID:donaldrivard,项目名称:Providence,代码行数:31,代码来源:providence.py
示例20: main
def main(path):
"""The main function of the program.
Mode 0 means Layer 3/4
Mode 1 means Layer 7"""
s.init(path)
# Set IN/OUT for further operations.
stdin = os.fdopen(0, 'rb')
stdout = os.fdopen(1, 'wb')
alt_stdin = os.fdopen(int(os.getenv("ALTERNATE_STDIN")), 'rb')
alt_stdout = os.fdopen(int(os.getenv("ALTERNATE_STDOUT")), 'wb')
# Give TCPConnection the output information
TCPConnection.stdout = stdout
TCPConnection.alt_stdout = alt_stdout
# All calls to inout_map except 0 or 3 give 'special', which means we got
# socket data.
#global inout_map
inout_map = {0: alt_stdout, 3: stdout}
inout_map = defaultdict(lambda: 'special', inout_map)
# Register inputs in poller for handling the channels.
global poller
poller = select.poll()
poller.register(stdin, select.POLLIN | select.POLLHUP)
poller.register(alt_stdin, select.POLLIN | select.POLLHUP)
# give the TCPConnection class access to the poller and the socket info
Connection.poller = poller
if s.mode == 1:
Connection.sockinfo = (s.config.get('Socket', 'path'))
# Start the sending routine.
t1 = threading.Thread(target=sendingRoutine)
t1.daemon = True
t1.start()
# Start the receiving routine.
t2 = threading.Thread(target=receivingRoutine, args=(poller,))
t2.daemon = True
t2.start()
# Start of main routine:
while 1:
filedescriptor, readdata = s.receivingQueue.get()
if s.mode == 0:
out = inout_map[filedescriptor]
if filedescriptor == 0:
filterpacket, status = tcpipfilter_out(readdata)
elif filedescriptor == 3:
filterpacket, status = tcpipfilter_in(readdata)
else:
raise ValueError("Unkown filedescriptor for filter.")
if status:
out.write(filterpacket)
out.flush()
s.logfile.flush()
elif s.mode == 1:
out = inout_map[filedescriptor]
if filedescriptor == 0 or filedescriptor == 3:
ethpacket = decode(readdata)
ptype = l34protocolfilter(ethpacket)
if (filedescriptor == 0) and (ptype != Protocol.Other):
# Test for filterhits.
if l34filter(ethpacket, ptype):
# If we have a match on layer34,
# proceed with layer 4 management
status, con = l4manage(ethpacket,
filedescriptor, ptype)
# Send l7 data to external controler via socket.
if status == 2:
send(Connection.SOCK, con,
getdata(ethpacket), 0)
# con.sock.send(getdata(ethpacket))
# # Forward data to target.
# elif status == 1:
# send(Connection.TAIL, None,
# readdata, 0)
# Drop packet
else:
pass
# No filterhit occured.
else:
# Send data to the other end of the wire.
send(Connection.TAIL, None, readdata, 0)
# ptype == Protocol.Other
elif (filedescriptor == 0) and (ptype == Protocol.Other):
# Send data to the other end of the wire.
send(Connection.TAIL, None, readdata, 0)
# filedescriptor == 3
elif (filedescriptor == 3) and (ptype == Protocol.TCP):
# Test for filterhits.
if l34filter(ethpacket, ptype, 1):
# Only TCP management is needed here, UDP is not
# monitored on the inward way.
status, con = l4TCPmanage(ethpacket,
filedescriptor)
# This is the way back,
# here we do not send to sock but need to
# make sure the correct seq and ack numbers
# are set in the packet we foreward.
if status == 2:
packet = con.makevalid(ethpacket)
packet = encode(packet)
#.........这里部分代码省略.........
开发者ID:Jotaro42,项目名称:wire,代码行数:101,代码来源:Wire.py
注:本文中的settings.init函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论