• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python subprocess.popen函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中subprocess.popen函数的典型用法代码示例。如果您正苦于以下问题:Python popen函数的具体用法?Python popen怎么用?Python popen使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了popen函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: getTrackedItems

def getTrackedItems():
    # The tracked items in the working tree according to the
    # currently set fit attributes
    fitSetRgx = re.compile('(.*): fit: set')
    p = popen('git ls-files -o'.split(), stdout=PIPE)
    p = popen('git check-attr --stdin fit'.split(), stdin=p.stdout, stdout=PIPE)
    return {m.group(1) for m in [fitSetRgx.match(l) for l in p.stdout] if m}
开发者ID:dotstaraj,项目名称:git-fit,代码行数:7,代码来源:changes.py


示例2: _get

def _get(items, store, pp, successes, failures):
    if not exists(tempDir):
        mkdir(tempDir)

    for filePath,objHash,size in items:
        pp.newItem(filePath, size)
        
        # Copy download to temp file first, and then to actual object location
        # This is to prevent interrupted downloads from causing bad objects to be placed
        # in the objects cache
        (tempHandle, tempTransferFile) = mkstemp(dir=tempDir)
        osclose(tempHandle)
        key = store.check('%s/%s'%(objHash[:2], objHash[2:]))

        try:
            transferred = store.get(key, tempTransferFile, size)
        except:
            transferred = False
        if key and transferred:
            pp.updateProgress(size, size)
            popen(['mv', tempTransferFile, filePath]).wait()
            successes.append((filePath, objHash, size))
        else:
            pp.updateProgress(size, size, custom_item_string='ERROR')
            failures.append(filePath)

    cache.insert({h:(s,f) for f,h,s in successes}, inLru=True, progressMsg='Caching newly gotten items')
开发者ID:dotstaraj,项目名称:git-fit,代码行数:27,代码来源:objects.py


示例3: save

def save(fitTrackedData, paths=None, pathArgs=None, forceWrite=False, quiet=False):
    added,removed,stubs = saveItems(fitTrackedData, paths=paths, pathArgs=pathArgs, quiet=quiet)

    if stubs:
        print '\nerror: The following items are empty, zero-byte files and cannot be added to fit:\n'
        for i in sorted(stubs):
            print '  ',i
        print
        return False

    if len(added) + len(removed) > 0 or forceWrite:
        print 'Working-tree changes saved.'
        writeFitFile(fitTrackedData)

    fitFileStatus = getFitFileStatus()
    if len(fitFileStatus) == 0 or fitFileStatus[1] == ' ':
        return True

    oldStagedFitFileHash = None
    newStagedFitFileHash = None
    if fitFileStatus[0] == 'A':
        oldStagedFitFileHash = getStagedFitFileHash()
    popen('git add -f'.split()+[fitFile]).wait()
    newStagedFitFileHash = getStagedFitFileHash()
    print 'Staged .fit file.'

    if oldStagedFitFileHash != newStagedFitFileHash:
        _saveToCache(added, fitTrackedData, newStagedFitFileHash)

    return True
开发者ID:dotstaraj,项目名称:git-fit,代码行数:30,代码来源:changes.py


示例4: test_returns_usage_information

    def test_returns_usage_information(self):
        # take output from 'neo -h'. Then take the first word
        output = popen(['neo', '-h'], stdout=PIPE).communicate()[0]
        assert 'Usage:' in str(output)

        output = popen(['neo', '--help'], stdout=PIPE).communicate()[0]
        assert 'Usage:' in str(output)
开发者ID:sdmoko,项目名称:neo-cli,代码行数:7,代码来源:test_help.py


示例5: edit

    def edit(self):
        checkhash = (lambda x: xhash(open(x, 'rb').read()).hexdigest())
        MD5 = checkhash(self.markfile)

        command = (self.editor, self.markfile)
        try: popen(command).wait()#FIXME    使用Gvim的话,会不等待,往下执行。
        except Exception: system(' '.join(command))

        return MD5 == checkhash(self.markfile)
开发者ID:ZWindL,项目名称:icebox,代码行数:9,代码来源:icebox.py


示例6: stop

def stop(proc_list):
    '''
       Stop the ChucK process
    '''
    for p in proc_list:
        popen("kill " + p, popen.stdout, popen.stderr)
    
    if proc.stderr:
        raise Exception(proc.stderr)
        sys.exit()
开发者ID:iaine,项目名称:sonification,代码行数:10,代码来源:supervisor.py


示例7: generator

def generator(start_userid, num):
    limit = sys.argv[1] or 10
    user_ids = []
    table = connection.table('user_info')

    for key, data in table.scan(row_start=start_userid, limit=num):
        user_ids.append(key)

    for user_id in user_ids:
        subprocess.popen('event_gen_worker.py',user_id)

    return start_userid
开发者ID:talldave,项目名称:MusicBox,代码行数:12,代码来源:event_gen_master.py


示例8: _run

def _run(command, commun=True):
    'Execute a command line.'
    from subprocess import PIPE, Popen as popen
    output = ''
    try:
        if commun:
            p = popen(command, stdout=PIPE)
            output = p.communicate()[0]
        else:
            p = popen(command)
    except WindowsError, e:
        _log.error('Windows Error: %s', str(e))
        raise
开发者ID:TobiasAAM,项目名称:scala-tests,代码行数:13,代码来源:scalaprov.py


示例9: write_eeprom_content

def write_eeprom_content(mfserial, orig_line, rvbd_sec, replace=False):
    # Write the binary file with RVBD serial back into EEPROM
    global TMP_OFILE

    output_str = ''
    count = 0

    replace_str = "%s%s" % (rvbd_sec, mfserial)
    output_str += replace_str
    count += len(replace_str)

    # Final padding
    for n in range(len(orig_line) - count):
        output_str += chr(255)

    f = open(TMP_OFILE, 'wb')
    f.write(output_str)
    f.close()
    
    i = 0
    err = False
    while i < 5:
        # Time to write it back to the EEPROM image
        dummy, output = subprocess.popen(EEPROM_WRITE)
        output = output.read().strip()
        if None == search(compile("Size to Write[\s]+: [\d]+ bytes$"), output):
            print "Could not write the EEPROM data, retrying"
            i += 1
            sleep (5)
            err = True
        else:
            if None != search(compile("FRU Read failed"), output):
                #retry as likely ipmitool timed out
                print "ipmitool timeout encountered, retrying"
                i += 1
                sleep (5)
                err = True
            else:
                err = False
                break

    if err:
        print "Cannot write back to the EEPROM, exiting"
        sys.exit(1)

    # clear out the files
    command = '/bin/rm -f %s' % TMP_OFILE
    dummy, output = subprocess.popen(command)
开发者ID:akkmzack,项目名称:RIOS-8.5,代码行数:48,代码来源:write_eeprom_serial.py


示例10: set_eeprom_state

def set_eeprom_state(state="RO"):
    # Set the EEPROM to RW and RO as desired
    command = EEPROM_STATE
        
    if "RO" == state:
        command += "0x37"
    else:
        command += "0xFB"
    
    i = 0
    err = False
    while i < 5:
        dummy, output = subprocess.popen(command)
        output = output.read().strip()
        if "" != output:
            print "Can't set EEPROM to %s, retrying" % state
            i += 1
            sleep (5)
            err = True
        else:
            err = False
            break
            
    if err:
        print "Could not change EEPROM state, exiting"
        sys.exit(1)
开发者ID:akkmzack,项目名称:RIOS-8.5,代码行数:26,代码来源:write_eeprom_serial.py


示例11: run_devserver

def run_devserver():
        """just for dev"""
        from subprocess import Popen as popen
        filename = sys.argv[1]
        if not filename:
            print 'use command like: python soxo.py ./wsgi.py'
            exit(0)
        begin_time = time.time()
        dirname = os.path.dirname(filename)
        dirname = './' if not dirname else dirname
        def is_file_modify(dirname):
            for fl in os.walk(dirname):
                for f in [f for f in fl[-1] if os.path.splitext(f)[1] == '.py']:
                    if '_html' not in f and os.stat(fl[0]+'/'+f).st_mtime > begin_time:
                        return True
        #watcher
        while True:
            p = popen(['python', filename])
            try:
                while True:#True:
                    #if any file change, reload
                    if is_file_modify(dirname):
                        p.terminate()
                        begin_time = time.time()
                        print ('some file change, server reloading...')
                        break
                    time.sleep(0.01)
            except KeyboardInterrupt:
                p.terminate()
                print ('\nterminate %s' % str(p))
                exit(0)
开发者ID:alphachoi,项目名称:soxo,代码行数:31,代码来源:soxo.py


示例12: printstation

def printstation(file, verbose):
    """
    printstation()
      - file:    path to file being written by pianobar
      - verbose: setting to print headers
      
      this method attempts to parse out the most recently-selected station.  this
      content is buried deep within the file; this method runs "grep" on the command-line,
      extracting all strings with "station" in them.  the last instance of this string
      is printed to stdout.
    """
    p = subprocess.popen(["grep", "station", file], stdout=subprocess.pipe, stderr=subprocess.pipe)
    result = p.communicate()[0]
    output = result.split("\r")[-1]
    
    if "station" in output:
        if verbose:
            output = output.replace("|>", "")
            output = re.sub("\(.*\)", "", output)
            output = output.strip()
            output = output.rstrip()
            print(output)
        else:
            output = output.split("\"")[1]
            print(output)
    else:
        print(default_msg)
开发者ID:jrlusby,项目名称:dotfiles,代码行数:27,代码来源:monitor-music.py


示例13: bctest

def bctest(testdir, testobj, exeext):

	execprog = testobj['exec'] + exeext
	execargs = testobj['args']
	execrun = [execprog] + execargs
	stdincfg = none
	inputdata = none
	if "input" in testobj:
		filename = testdir + "/" + testobj['input']
		inputdata = open(filename).read()
		stdincfg = subprocess.pipe

	outputfn = none
	outputdata = none
	if "output_cmp" in testobj:
		outputfn = testobj['output_cmp']
		outputdata = open(testdir + "/" + outputfn).read()
	proc = subprocess.popen(execrun, stdin=stdincfg, stdout=subprocess.pipe, stderr=subprocess.pipe,universal_newlines=true)
	try:
		outs = proc.communicate(input=inputdata)
	except oserror:
		print("oserror, failed to execute " + execprog)
		sys.exit(1)

	if outputdata and (outs[0] != outputdata):
		print("output data mismatch for " + outputfn)
		sys.exit(1)

	wantrc = 0
	if "return_code" in testobj:
		wantrc = testobj['return_code']
	if proc.returncode != wantrc:
		print("return code mismatch for " + outputfn)
		sys.exit(1)
开发者ID:moorecoin,项目名称:MooreCoinMiningAlgorithm,代码行数:34,代码来源:bctest.py


示例14: cheetah

def cheetah(template,run=True,block=True):
    '''
    run shell command after substituting variables from calling function
    using cheetah template engine
    uses @ as the variable start token to make writing
    shell commands easier
    '''
    
    #get local variables from calling function
    var = inspect.currentframe().f_back.f_locals

    #change the 'variable start token' to something more shell-friendly
    template = '#compiler-settings\ncheetahVarStartToken = @\n#end compiler-settings'\
             + template

    cmd = str(Template(template, searchList=[var]))
    
    if run == False:
        #don't run just print the command
        print cmd
        return 0
        
    #run command in a subshell
    p = popen(cmd,shell=True)
    
    if block == True:
        #wait for the command to complete
        p.wait()
        assert p.returncode == 0
        return p.returncode
        
    #do not wait for the command to complete
    return p
开发者ID:robertvi,项目名称:rjv,代码行数:33,代码来源:cheetah.py


示例15: handle

 def handle(self, *args, **options):
     loops = 1
     if len(args) >= 1:
         loops = int(args[0])
     self.stdout.write("Analysing FHBRS-Application ...\r\n")
     results = []
     for loop in xrange(loops):
         for wget_url in wget_urls:
             self.stdout.write("\tFetching '%s' ...\r\n" % (wget_url))
             wget_cmd = [wget_bin.strip()] + wget_args + [wget_url]
             wget_pipe = popen(wget_cmd, stdout=PIPE)
             wget_out = wget_pipe.stdout.read()
             json_match = json_re.search(wget_out)
             json_text = json_match.group(1).strip()
             result = json.loads(json_text)
             #self.stdout.write("\tAnalyse-Result:\r\n")
             #for key, value in result.iteritems():
             #    self.stdout.write("\t\t%s: %s\r\n" % (str(key), str(value)))
             results.append(result)
     self.stdout.write("\r\n")
     render_time_key = 'request-to-response-time'
     render_times = [result[render_time_key] for result in results]
     render_time_avg = sum(render_times) / len(render_times)
     self.stdout.write("Analyse-Render-Time-Avg: %f\r\n" % (render_time_avg))
     sql_query_count_key = 'sql-query-count'
     sql_query_counts = [result[sql_query_count_key] for result in results]
     sql_query_count_avg = float(sum(sql_query_counts)) / len(sql_query_counts)
     self.stdout.write("Analyse-SQL-Query-Count-Avg: %f\r\n" % (sql_query_count_avg))
     return
开发者ID:jk779,项目名称:kss5-django,代码行数:29,代码来源:analyse.py


示例16: getDataStore

def getDataStore(progressCallback):
    moduleName = popen('git config fit.datastore.moduleName'.split(), stdout=PIPE).communicate()[0].strip()
    modulePath = popen('git config fit.datastore.modulePath'.split(), stdout=PIPE).communicate()[0].strip()

    if not moduleName:
        raise Exception('error: No external data store is configured. Check the fit.datastore keys in git config.')

    if modulePath:
        import sys
        sys.path.append(modulePath)

    try:
        from importlib import import_module
        return import_module(moduleName).Store(progressCallback)
    except Exception as e:
        print 'error: Could not load the data store configured in fit.datastore.'
        raise
开发者ID:dotstaraj,项目名称:git-fit,代码行数:17,代码来源:objects.py


示例17: _getWorkingTreeStateForLastHead

def _getWorkingTreeStateForLastHead(fitData, fitManifestChanges):
    if not fitManifestChanges:
        return fitData

    if '.fit' in fitManifestChanges:
        fitManifestChanges.remove('.fit')

    if fitManifestChanges:
        popen('git checkout [email protected]{1}'.split() + list(fitManifestChanges), stdout=open(devnull, 'wb'), stderr=open(devnull, 'wb')).wait()
    try:
        saveItems(fitData, quiet=True)
    except:
        raise
    finally:
        if fitManifestChanges:
            popen('git checkout HEAD'.split() + list(fitManifestChanges), stdout=open(devnull, 'wb'), stderr=open(devnull, 'wb')).wait()
    return fitData
开发者ID:dotstaraj,项目名称:git-fit,代码行数:17,代码来源:hooks.py


示例18: foo

def foo(command):
    # Subprocess wrapper
    command = shlex.split(command)
    try:
        x = subprocess.popen(command)
    except Exception, e:
        print "[!] Error while executing system command: %s" % e
        x = 1
开发者ID:Antimoon,项目名称:Smartphone-Pentest-Framework,代码行数:8,代码来源:main.py


示例19: execute_command

  def execute_command(self, command, match, prefix, target):
    from os.path import realpath, dirname, join
    from subprocess import Popen as popen, PIPE
    from time import time

    #TODO: allow only commands below ./commands/
    exe = join(dirname(realpath(dirname(__file__))), command['argv'][0])
    myargv = [exe] + command['argv'][1:]
    try:
      if match and match.groupdict().get('args', None):
        myargv += [match.groupdict()['args']]
    except:
        log.info("cannot parse args!")

    cwd = self.getconf('workdir')
    if not os.access(cwd,os.W_OK):
        log.error("Workdir '%s' is not Writable! Falling back to root dir"%cwd)
        cwd = "/"

    env = {}

    env.update(os.environ) # first merge os.environ
    env.update(command.get('env', {})) # then env of cfg

    env['_prefix'] = prefix
    env['_from'] = prefix.split('!', 1)[0]

    log.debug('self:' +self.nickname)
    # when receiving /query, answer to the user, not to self
    if self.nickname in target:
      target.remove(self.nickname)
      target.append(env['_from'])
    log.debug('target:' +str(target))

    start = time()
    try:
      log.debug("Running : %s"%str(myargv))
      log.debug("Environ : %s"%(str(env)))
      p = popen(myargv, bufsize=1, stdout=PIPE, stderr=PIPE, env=env, cwd=cwd)
    except Exception as error:
      self.ME(target, 'brain damaged')
      log.error('[email protected]%s: %s' % (myargv, error))
      return
    pid = p.pid
    for line in iter(p.stdout.readline, ''.encode()):
      try:
        self.PRIVMSG(target, translate_colors(line.decode()))
      except Exception as error:
        log.error('no send: %s' % error)
      log.debug('%s stdout: %s' % (pid, line))
    p.wait()
    elapsed = time() - start
    code = p.returncode
    log.info('command: %s -> %s in %d seconds' % (myargv, code, elapsed))
    [log.debug('%s stderr: %s' % (pid, x)) for x in p.stderr.readlines()]

    if code != 0:
      self.ME(target, 'mimimi')
开发者ID:krebscode,项目名称:Reaktor,代码行数:58,代码来源:core.py


示例20: getStagedOffenders

def getStagedOffenders():
    fitConflict = []
    binaryFiles = []

    staged = []
    p = popen('git diff --name-only --diff-filter=A --cached'.split(), stdout=PIPE)
    p = popen('git check-attr --stdin fit'.split(), stdin=p.stdout, stdout=PIPE)
    for l in p.stdout:
        filepath = l[:l.find(':')]
        if l.endswith(' set\n'):
            fitConflict.append(filepath)
        elif l.endswith(' unspecified\n'):
            staged.append(filepath)

    if len(staged) > 0:
        binaryFiles = filterBinaryFiles(staged)

    return set(fitConflict), set(binaryFiles)
开发者ID:dotstaraj,项目名称:git-fit,代码行数:18,代码来源:changes.py



注:本文中的subprocess.popen函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python subprocess.process函数代码示例发布时间:2022-05-27
下一篇:
Python subprocess.list2cmdline函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap