• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python mmap.mmap函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mmap.mmap函数的典型用法代码示例。如果您正苦于以下问题:Python mmap函数的具体用法?Python mmap怎么用?Python mmap使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了mmap函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: barebox_overlay_mbr

def barebox_overlay_mbr(fd_barebox, fd_hd):
    import mmap, os
    sb = os.fstat(fd_barebox.fileno())
    barebox_image = mmap.mmap(fd_barebox.fileno(), 0, access=mmap.ACCESS_READ)

    check_for_valid_mbr(barebox_image, sb.st_size)

    required_size = sb.st_size
    hd_image = mmap.mmap(fd_hd.fileno(), required_size, access=mmap.ACCESS_WRITE)

    check_for_space(hd_image, required_size)

    # embed barebox's boot code into the disk drive image
    hd_image[0:OFFSET_OF_PARTITION_TABLE] = barebox_image[0:OFFSET_OF_PARTITION_TABLE]

	# embed the barebox main image into the disk drive image,
	# but keep the persistant environment storage untouched
	# (if defined), e.g. store the main image behind this special area.
    hd_image_start = SECTOR_SIZE
    barebox_image_start = SECTOR_SIZE
    size = sb.st_size - SECTOR_SIZE
    hd_image[hd_image_start:hd_image_start+size] = barebox_image[barebox_image_start:barebox_image_start+size]

    embed = PATCH_AREA
    indirect = SECTOR_SIZE

    fill_daps(DAPS(hd_image, embed), 1, INDIRECT_AREA, INDIRECT_SEGMENT, 1)

    rc = barebox_linear_image(hd_image, indirect, sb.st_size)
    if not rc:
        return False

    hd_image.close()
    barebox_image.close()
开发者ID:Jokymon,项目名称:izanagi,代码行数:34,代码来源:setupmbr.py


示例2: test_get_stream

    def test_get_stream(self):

        # empty stream
        with closing(NamedTemporaryFile()) as f:
            eol1 = random.choice(('\r\n', '\n'))
            eol2 = random.choice(('\r\n', '\r', '\n'))
            f.write('<<>>\nstream' + eol1 + eol2 + 'endstream')
            f.flush()

            with closing(mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)) as stream:
                p = PDFLexer(stream)
                s = p.get_stream(0)
                assert s.data == ''
                assert s.start_pos == 0
                assert s.end_pos == 5 + 6 + len(eol1) + len(eol2) + 8 + 1
                assert s.stream_dict.data == {}

        with closing(NamedTemporaryFile()) as f:
            eol1 = random.choice(('\r\n', '\n'))
            eol2 = random.choice(('\r\n', '\r', '\n'))
            data = self._rand_string(random.randint(0, 65536))
            f.write('<<>>\nstream' + eol1 + data + eol2 + 'endstream')
            f.flush()

            with closing(mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)) as stream:
                p = PDFLexer(stream)
                s = p.get_stream(0)
                assert s.data == data
                assert s.start_pos == 0
                assert s.end_pos == 5 + 6 + len(eol1) + len(data) + len(eol2) + 8 + 1
                assert s.stream_dict.data == {}
开发者ID:wildmb,项目名称:pdfproto,代码行数:31,代码来源:test_PDFLexer.py


示例3: file_contents_ro

def file_contents_ro(fd, stream=False, allow_mmap=True):
    """:return: read-only contents of the file represented by the file descriptor fd

    :param fd: file descriptor opened for reading
    :param stream: if False, random access is provided, otherwise the stream interface
        is provided.
    :param allow_mmap: if True, its allowed to map the contents into memory, which
        allows large files to be handled and accessed efficiently. The file-descriptor
        will change its position if this is False"""
    try:
        if allow_mmap:
            # supports stream and random access
            try:
                return mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
            except EnvironmentError:
                # python 2.4 issue, 0 wants to be the actual size
                return mmap.mmap(fd, os.fstat(fd).st_size, access=mmap.ACCESS_READ)
            # END handle python 2.4
    except OSError:
        pass
    # END exception handling

    # read manully
    contents = os.read(fd, os.fstat(fd).st_size)
    if stream:
        return _RandomAccessBytesIO(contents)
    return contents
开发者ID:Kronuz,项目名称:gitdb,代码行数:27,代码来源:util.py


示例4: create_jpeg_from_itc

def create_jpeg_from_itc(artwork_file):
    """Parses out JPEG from .itc files"""
    
    global artwork_item_count
    global artwork_name_prefix
    
    try:
        artwork_item_count += 1
        
        itc_file_handle = open(artwork_file, "r+")
        byte_data = mmap.mmap(itc_file_handle.fileno(),0)
    
        file_size = len(byte_data)
        new_size = file_size - JPEG_SIGNATURE_OFFSET

        # Extract out ITC metadata info that we don't need for now
        byte_data.move(0, JPEG_SIGNATURE_OFFSET, file_size - JPEG_SIGNATURE_OFFSET)
        byte_data.flush()
        byte_data.close()
        itc_file_handle.truncate(new_size)
        byte_data = mmap.mmap(itc_file_handle.fileno(),0)
    
        jpeg_file = artwork_file.replace('.itc', '.jpeg')
        
        artwork_path_components = jpeg_file.split("/")
        artwork_path_components[-1] = artwork_name_prefix + str(artwork_item_count) + ".jpeg"
        jpeg_file = "/".join(artwork_path_components)
        
        os.rename(artwork_file, jpeg_file)
    except:
        sys.stderr.write("Error: could not convert %s to JPEG." % str(artwork_file))
        sys.exit(-1)
开发者ID:irvingruan,项目名称:Matisse,代码行数:32,代码来源:Matisse.py


示例5: __init__

 def __init__(self):
     self._acpmf_physics = mmap.mmap(0, ctypes.sizeof(SPageFilePhysics), "acpmf_physics")
     self._acpmf_graphics = mmap.mmap(0, ctypes.sizeof(SPageFileGraphic), "acpmf_graphics")
     self._acpmf_static = mmap.mmap(0, ctypes.sizeof(SPageFileStatic), "acpmf_static")
     self.physics = SPageFilePhysics.from_buffer(self._acpmf_physics)
     self.graphics = SPageFileGraphic.from_buffer(self._acpmf_graphics)
     self.static = SPageFileStatic.from_buffer(self._acpmf_static)
开发者ID:ev-agelos,项目名称:AC-dashes-and-times-ranking,代码行数:7,代码来源:sim_info.py


示例6: check_trajectory_file_type

def check_trajectory_file_type(file_name, bytes_to_check=1000000):

    #Check file exists
    if not os.path.isfile(file_name):
        print file_name + ' file does not exists'
        exit()

    #Check if LAMMPS file
    with open (file_name, "r+") as f:
        file_map = mmap.mmap(f.fileno(), bytes_to_check)
        num_test = [file_map.find('ITEM: TIMESTEP'),
                    file_map.find('ITEM: NUMBER OF ATOMS'),
                    file_map.find('ITEM: BOX BOUNDS')]

    file_map.close()

    if not -1 in num_test:
            return read_lammps_trajectory

    #Check if VASP file
    with open (file_name, "r+") as f:
        file_map = mmap.mmap(f.fileno(), bytes_to_check)
        num_test = [file_map.find('NIONS'),
                    file_map.find('POMASS'),
                    file_map.find('direct lattice vectors')]

    file_map.close()

    if not -1 in num_test:
            return read_vasp_trajectory

    print('Trajectory file not recognized')
    exit()
    return None
开发者ID:santiama,项目名称:DynaPhoPy,代码行数:34,代码来源:iofile.py


示例7: loadDocs

def loadDocs(path):
    #do this using memory-mapped io. faster? think so.
    print "Loading docs ..."
    #get number of lines 
    numLines = 0
    with open(path, "r+b") as f:
        m=mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)
        while(m.readline() != ''):
            numLines += 1
    print str(numLines) +" docs to load."
    docs = numLines *[None]
    #read the docs in
    with open(path, "r+b") as f:
        m=mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)
        i = 0;
        while(True):
            line = m.readline()
            if line == '':
                break
            #print line
            line = line.rstrip().lstrip()
            line = line[line.find(' ')+1:]
            split = line.split(" ")
            doc = (n.array([int(p.split(":")[0]) for p in split])
            ,n.array([int(p.split(":")[1]) for p in split]))
            #print doc
            #print 
            docs[i] = doc
            i += 1
    print "done."
    return docs
开发者ID:Syed-Arshad,项目名称:streaming_vb,代码行数:31,代码来源:archived_dataset.py


示例8: mergeTermFiles

def mergeTermFiles():
    files = os.listdir(constants.termsDir)
    linesinFile = len(files)
    linesToAdd = getLinesToAdd(linesinFile)
    files = addDummyTerms(files,linesToAdd)
    files.sort()

    f = open(constants.termsFile, "r+b")
    map = mmap.mmap(f.fileno(), 0)
        
    fr =  open(constants.termsListFile,"w")
    
    f2 =  open(constants.fSortedTermIndex, "wb")

    byteLen = 0
    for filex in files:
        if (filex[0:1] != constants.underscore):
            fr.write(filex+constants.space)
            fx =  open(constants.termsDir+"/"+filex, "r+b")
            map1 = mmap.mmap(fx.fileno(), 0)
            map1.seek(0)
            map.resize(map.size()+map1.size())
            map.write(map1[0:])
            Str = makeTermStr(filex,byteLen,byteLen+map1.size())
            byteLen = byteLen+map1.size()
        else:
            Str = makeTermStr(filex,0,0)

        f2.write(Str.encode(constants.encoding))
               
    fr.close()   
    f2.close()
    map.close()
    f.close()
开发者ID:shoebmogal,项目名称:Search-engine,代码行数:34,代码来源:mapCreator.py


示例9: __init__

        def __init__(self, size):
            if sys.version_info >= (2, 5, 0):
                self.buffer = mmap.mmap(-1, size)
                self.size = size
                self.name = None
            else:
                fd, self.name = tempfile.mkstemp(prefix='pym-')
                self.size = remaining = size
                while remaining > 0:
                    remaining -= os.write(fd, '\0' * remaining)
                self.buffer = mmap.mmap(fd, size)
                os.close(fd)

                if sys.platform == 'cygwin':
                    # cannot unlink file until it is no longer in use
                    def _finalize_heap(mmap, unlink, name):
                        mmap.close()
                        unlink(name)
                    Finalize(
                        self, _finalize_heap,
                        args=(self.buffer, os.unlink, name),
                        exitpriority=-10
                        )
                else:
                    os.unlink(self.name)
开发者ID:ConduitTeam,项目名称:hue,代码行数:25,代码来源:heap.py


示例10: test_big_mappings

 def test_big_mappings(self):
     with SpicommDev() as dev:
         with mmap.mmap(dev, length=7 * 1024 * 1024 + 1, offset=0 * mmap.PAGESIZE) as mm1, \
              mmap.mmap(dev, length=12 * 1024 * 1024 + 2 ,
                 offset=num_pages(len(mm1)) * mmap.PAGESIZE) as mm2:
             self.assertEqual(len(mm1), 7 * 1024 * 1024 + 1)
             self.assertEqual(len(mm2), 12 * 1024 * 1024 + 2)
开发者ID:eosurman,项目名称:physicsc,代码行数:7,代码来源:spicomm_test.py


示例11: _mmap

 def _mmap(self):
   ''' protected api '''
   # mmap.mmap has a full bytebuffer API, so we can use it as is for bytebuffer.
   # we have to get a ctypes pointer-able instance to make our ctypes structure read efficient.
   # sad we can't have a bytebuffer from that same raw memspace
   # we do not keep the bytebuffer in memory, because it's a lost of space in most cases.
   if self._base is None:
     mmap_hack = True
     if mmap_hack: # XXX that is the most fucked up, non-portable fuck I ever wrote.
       self._local_mmap_bytebuffer = mmap.mmap(self._memdump.fileno(), self.end-self.start, access=mmap.ACCESS_READ)
       # yeap, that right, I'm stealing the pointer value. DEAL WITH IT.
       heapmap = struct.unpack('L', (ctypes.c_uint).from_address(id(self._local_mmap_bytebuffer) + 8 ) )[0] 
       self._local_mmap_content = (ctypes.c_ubyte*(self.end-self.start)).from_address(heapmap)
     elif hasattr(self._memdump,'fileno'): # normal file. mmap kinda useless i suppose.
       log.warning('Memory Mapping content mmap-ed() (double copy of %s) : %s'%(self._memdump.__class__, self))
       # we have the bytes
       local_mmap_bytebuffer = mmap.mmap(self._memdump.fileno(), self.end-self.start, access=mmap.ACCESS_READ)
       # we need an ctypes
       self._local_mmap_content = utils.bytes2array(local_mmap_bytebuffer, ctypes.c_ubyte)
     else: # dumpfile, file inside targz ... any read() API really
       self._local_mmap_content = utils.bytes2array(self._memdump.read(), ctypes.c_ubyte)
       log.warning('Memory Mapping content copied to ctypes array : %s'%(self))
     # make that _base
     self._base = LocalMemoryMapping.fromAddress( self, ctypes.addressof(self._local_mmap_content) )
     log.debug('LocalMemoryMapping done.')
   #redirect stuff
   self.readWord = self._base.readWord
   self.readArray = self._base.readArray
   self.readBytes = self._base.readBytes
   self.readStruct = self._base.readStruct
   return self._base
开发者ID:f9tech,项目名称:twiler-site-packages,代码行数:31,代码来源:memory_mapping.py


示例12: __init__

    def __init__(self, evtout):
        path = format("/dev/uio%d" % (evtout))
        self.uio = os.open(path, os.O_RDWR | os.O_SYNC, 0)
        self.pruss_phys_base = readhex(pruss_base)
        self.pruss_map_size = readhex(pruss_size)
        self.dataram_base = mmap.mmap(self.uio, self.pruss_map_size,
                                      mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)

        # hokey way to get at address of mmap region
        i = ctypes.c_uint8.from_buffer(self.dataram_base)
        ba = ctypes.addressof(i)

        self.drw = Mem(4,ba, self.pruss_map_size)
        self.drs = Mem(2,ba, self.pruss_map_size)
        self.drb = Mem(1,ba, self.pruss_map_size)

        self.version =  self.detect_hw_version()
        if self.version < 0:
            raise Exception, "cannot detect hardware version"

        self.extram_phys_base = readhex(extram_base)
        self.extram_map_size = readhex(extram_size)
        self.extram = mmap.mmap(self.uio, self.extram_map_size,
                                mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)

        e = ctypes.c_uint8.from_buffer(self.extram)
        ea = ctypes.addressof(e)

        self.erw = Mem(4,ea, self.extram_map_size)
        self.ers = Mem(2,ea, self.extram_map_size)
        self.erb = Mem(1,ea, self.extram_map_size)
开发者ID:13788593535,项目名称:machinekit,代码行数:31,代码来源:pruaccess.py


示例13: fix_savedata

def fix_savedata(dir):
    if (not os.path.isdir(dir) or not os.path.isfile(dir+"/SYS.BIN") ): 
        ErrorMessageBox("Ŀ¼´íÎó")
        
    import mmap
    fd = os.open(dir+"/SYS.BIN", os.O_RDWR)
    buf = mmap.mmap(fd, os.fstat(fd).st_size, access=mmap.ACCESS_WRITE)
    if (buf[0:8] != "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"): 
        print "Bad savedata or not decrypted. SYS.BIN"
        ErrorMessageBox("´æµµ´íÎó")
    for pos in range(0x269480, 0x269480 + 0x1258 * 100) :
        if buf[pos:pos+4] == "\0\0\0\2" :
            buf[pos+0x18:pos+0x58] = "\0\0\0\0" * 0x10
        pos+=0x1258
    os.close(fd)
    print 'Fix SYS.BIN.'
    
    import fnmatch
    zstr = "\0\0\0\0" * ((0x8A358 - 0x46358) / 4)
    for directory, subdirectories, files in os.walk(dir):
      for file in files:
        if fnmatch.fnmatch(file, 'SAVE???.BIN'):
            fd = os.open(os.path.join(directory, file), os.O_RDWR)
            buf = mmap.mmap(fd, os.fstat(fd).st_size, access=mmap.ACCESS_WRITE)
            if (buf[0:4] != "\0\0\0\2") :
                print "Bad savedata or not decrypted. %s" % file
                ErrorMessageBox("´æµµ´íÎó»òδ½âÃÜ")
            buf[0x18:0x58] = "\0\0\0\0" * 0x10
            buf[0x46358:0x8A358] = zstr
            os.close(fd)
            print 'Fix %s.' % (file)
    windll.user32.MessageBoxA(None, "´æµµÐÞÕýÍê³É!", EXE_TITLE, 0)
开发者ID:kid23,项目名称:WHITE_ALBUM2,代码行数:32,代码来源:import_file.py


示例14: _sequence2mmap

    def _sequence2mmap(cls, sequence) -> (mmap.mmap, int):  # Final
        """ :return An anonymous mmap storing the bytestring representation of the sequence @sequence, paired with the
        number of elements in the sequence. @sequence needs to either be a bytestring or an iterable containing only
        elements that implement __len__. """

        def double_mmap_capacity(m):
            new_m = mmap.mmap(-1, capacity)
            new_m.write(bytes(m))  # FIXME Potentially large bytestring
            m.close()
            return new_m

        protection = cls._access()
        if isinstance(sequence, bytes):
            m = mmap.mmap(-1, len(sequence), access=protection)
            m.write(sequence)
            return m, len(m)
        capacity = mmap.PAGESIZE  # Initial capacity. Cannot do len(sequence) since it is a generator.
        m = mmap.mmap(-1, capacity)
        currentsize = 0
        element_count = 0
        for element in sequence:
            element_count += 1
            bs = cls._encode(element)
            currentsize += len(bs)
            while currentsize > capacity:
                capacity *= 2
                m = double_mmap_capacity(m)  # Because m.resize() is apparently bugged and causes SIGBUS
            m.write(bs)
        m.resize(currentsize)
        return m, element_count
开发者ID:Sebelino,项目名称:pyromhackit,代码行数:30,代码来源:gmmap.py


示例15: _load

    def _load(self, do_crop=False):
        """
        Convert word file to html and store it in tempfile

        params:
            - do_crop: Whether to crop file to content or leave whole html
        """

        #Get temporary file where wvHtml will store output
        out_file = tempfile.mkstemp()[1]

        #Call wvHtml
        subprocess.check_call(['wvHtml', self.file, out_file])

        if do_crop:
            #Create mmap object for file
            self.html = open(out_file, 'r+b')
            self.html_map = mmap.mmap(self.html.fileno(), 0)
            #Get index of real data section start and end
            #21 is length of header
            start = self.html_map.find('<!--Section Begins-->') + 21
            end = self.html_map.rfind('<!--Section Ends-->')
            #Resize map to new size
            self.html_map.move(0, start, end - start)
            self.html_map.resize(end - start)
        else:
            #Just load output
            self.html = open(out_file, 'r+b')
            self.html_map = mmap.mmap(self.html.fileno(), 0)

        #Fix paths to images
        self._fix_images()
开发者ID:maksbotan,项目名称:gimn-gen,代码行数:32,代码来源:word_importer.py


示例16: get_sinks

def get_sinks(src,video_capture,frame):
    sink = src.overlay_sink
    if sink:
        sink_name = sink.short_id
        filename = os.path.join(settings.PROJECT_ROOT,"run","sinks",sink_name)
        overlay_f = open(filename, 'w+b')
        overlay_f.seek(0, os.SEEK_SET)
        try:
            overlay_f.write(frame.tostring() )
        except Exception:
            video_capture.release()
            frame,video_capture = open_source(src)
        overlay_f.seek(0, os.SEEK_SET)
        overlay = mmap.mmap(overlay_f.fileno(), len(frame.tostring()), mmap.MAP_SHARED, prot=mmap.PROT_WRITE)
    else:
        overlay = None

    sink = src.raw_sink
    if sink:
        sink_name = sink.short_id
        filename = os.path.join(settings.PROJECT_ROOT,"run","sinks",sink_name)
        raw_f = open(filename, 'w+b')
        raw_f.seek(0, os.SEEK_SET)
        try:
            raw_f.write(frame.tostring() )
        except Exception:
            video_capture.release()
            frame,video_capture = open_source(src)

        raw_f.seek(0, os.SEEK_SET)
        raw = mmap.mmap(raw_f.fileno(), len(frame.tostring()), mmap.MAP_SHARED, prot=mmap.PROT_WRITE)
    else:
        raw = None

    return raw,overlay,video_capture
开发者ID:gat3way,项目名称:ocv,代码行数:35,代码来源:common.py


示例17: initshm

 def initshm(self, count = 0):
     if count >= 3:
         # Well, we tried. Likely this died without cleaning up.
         sem = posix_ipc.Semaphore(self.name)
         sem.unlink()
         sem.close()
         del sem
         count = 0
     try:
         sem = posix_ipc.Semaphore(self.name, flags=os.O_CREAT|os.O_EXCL)
     except ExistentialError:
         try:
             shm = posix_ipc.SharedMemory(self.name)
         except ExistentialError:
             # So, the semaphore exists but the shared memory does not. Clearly things
             # are rotten in the state of Denmark.
             # We'll try again a few more times; perhaps we unluckily another one of
             # us in the middle of creating the shm.
             time.sleep(.25)
             return self.initshm(count + 1)
         sem = posix_ipc.Semaphore(self.name)
         sem.acquire(5)
         mem = mmap.mmap(shm.fd, 0)
         return (shm, mem, sem)
     shm = posix_ipc.SharedMemory(self.name, flags=os.O_CREAT|os.O_EXCL, size=self.memlen)
     mem = mmap.mmap(shm.fd, 0)
     initial = pickle.dumps({"this": (0, 1)})
     if len(initial) > self.memlen:
         raise ValueError("Your memlen is too small")
     mem.write(initial)
     mem.seek(0)
     return (shm, mem, sem)
开发者ID:mdlowman,项目名称:oniichan,代码行数:32,代码来源:reddit.py


示例18: init

    def init(self):
        with self.lock:
            if self.inited:
                return

            files = os.listdir(self.dir_)
            for fi in files:
                if fi == "lock":
                    continue

                file_path = os.path.join(self.dir_, fi)
                if not os.path.isfile(file_path) or LEGAL_STORE_FILE_REGEX.match(fi) is None:
                    raise StoreNotSafetyShutdown("Store did not shutdown safety last time.")
                else:
                    self.legal_files.append(file_path)

            self.legal_files = sorted(self.legal_files, key=lambda k: int(os.path.basename(k)))

            if len(self.legal_files) > 0:
                read_file_handle = self.file_handles[READ_ENTRANCE] = open(self.legal_files[-1], "r+")
                self.map_handles[READ_ENTRANCE] = mmap.mmap(read_file_handle.fileno(), self.store_file_size)
                if len(self.legal_files) == 1:
                    self.file_handles[WRITE_ENTRANCE] = self.file_handles[READ_ENTRANCE]
                    self.map_handles[WRITE_ENTRANCE] = self.map_handles[READ_ENTRANCE]
                else:
                    write_file_handle = self.file_handles[WRITE_ENTRANCE] = open(self.legal_files[0], "r+")
                    self.map_handles[WRITE_ENTRANCE] = mmap.mmap(write_file_handle.fileno(), self.store_file_size)

            self.inited = True
开发者ID:awai0707,项目名称:cola,代码行数:29,代码来源:store.py


示例19: test_offset

    def test_offset (self):
        f = open (TESTFN, 'w+b')

        try: # unlink TESTFN no matter what
            halfsize = mmap.ALLOCATIONGRANULARITY
            m = self.make_mmap_file (f, halfsize)
            m.close ()
            f.close ()

            mapsize = halfsize * 2
            # Try invalid offset
            f = open(TESTFN, "r+b")
            for offset in [-2, -1, None]:
                try:
                    m = mmap.mmap(f.fileno(), mapsize, offset=offset)
                    self.assertEqual(0, 1)
                except (ValueError, TypeError, OverflowError):
                    pass
                else:
                    self.assertEqual(0, 0)
            f.close()

            # Try valid offset, hopefully 8192 works on all OSes
            f = open(TESTFN, "r+b")
            m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
            self.assertEqual(m[0:3], b'foo')
            f.close()
            m.close()

        finally:
            f.close()
            try:
                os.unlink(TESTFN)
            except OSError:
                pass
开发者ID:LinkedModernismProject,项目名称:web_code,代码行数:35,代码来源:test_mmap.py


示例20: __init__

    def __init__(self, config, threshold=5, false_start=5, blue_shmem_name="/blue.shmem",
                 red_shmem_name="/red.shmem", clear_sem_name="/clear.sem",
                 mem=4096):
        import posix_ipc as ipc
        self.ipc = ipc
        self.threshold = threshold
        self.false_start = false_start
        self.mem = mem

        self.blue_shmem = ipc.SharedMemory(blue_shmem_name, ipc.O_CREAT,
                                           mode=0666, size=self.mem)
                                           #read_only=True)
        self.red_shmem = ipc.SharedMemory(red_shmem_name, ipc.O_CREAT,
                                          mode=0666, size=self.mem)
                                          #read_only=True)
        self.clear_sem = ipc.Semaphore(clear_sem_name, ipc.O_CREAT, 0666, 0)

        self.red_map = mmap.mmap(self.red_shmem.fd, self.mem, mmap.MAP_SHARED,
                                 mmap.PROT_READ)
        self.blue_map = mmap.mmap(self.blue_shmem.fd, self.mem, mmap.MAP_SHARED,
                                  mmap.PROT_READ)

        # spawning C program that handles gpio signals
        Popen(['', '/home/koral/goldio/goldwire', config, str(threshold), str(mem)],
              executable='/usr/bin/sudo',
              stdout=PIPE)
开发者ID:xkoralsky,项目名称:goldsprints,代码行数:26,代码来源:device.py



注:本文中的mmap.mmap函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python audit.AuditFactory类代码示例发布时间:2022-05-27
下一篇:
Python mmap.error函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap