• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python core.AnalogSignal类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neo.core.AnalogSignal的典型用法代码示例。如果您正苦于以下问题:Python AnalogSignal类的具体用法?Python AnalogSignal怎么用?Python AnalogSignal使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了AnalogSignal类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: read_analogsignal

 def read_analogsignal(self, channel_index=None, lazy=False, cascade=True):
     """
     Read raw traces
     Arguments:
         channel_index: must be integer array
     """
     if self._attrs["app_data"]:
         bit_volts = self._attrs["app_data"]["channel_bit_volts"]
         sig_unit = "uV"
     else:
         bit_volts = np.ones((self._attrs["shape"][1]))  # TODO: find conversion in phy generated files
         sig_unit = "bit"
     if lazy:
         anasig = AnalogSignal(
             [],
             units=sig_unit,
             sampling_rate=self._attrs["kwik"]["sample_rate"] * pq.Hz,
             t_start=self._attrs["kwik"]["start_time"] * pq.s,
         )
         # we add the attribute lazy_shape with the size if loaded
         anasig.lazy_shape = self._attrs["shape"][0]
     else:
         data = self._kwd["recordings"][str(self._dataset)]["data"].value[:, channel_index]
         data = data * bit_volts[channel_index]
         anasig = AnalogSignal(
             data,
             units=sig_unit,
             sampling_rate=self._attrs["kwik"]["sample_rate"] * pq.Hz,
             t_start=self._attrs["kwik"]["start_time"] * pq.s,
         )
         data = []  # delete from memory
     # for attributes out of neo you can annotate
     anasig.annotate(info="raw traces")
     return anasig
开发者ID:jakirkham,项目名称:python-neo,代码行数:34,代码来源:kwikio.py


示例2: read_analogsignal

    def read_analogsignal(self ,
                          # the 2 first key arguments are imposed by neo.io API
                          lazy = False,
                          cascade = True,
                          channel_index = 0,
                          segment_duration = 15.,
                          t_start = -1,
                          ):
        """
        With this IO AnalogSignal can e acces directly with its channel number

        """
        sr = 10000.
        sinus_freq = 3. # Hz
        #time vector for generated signal:
        tvect = np.arange(t_start, t_start+ segment_duration , 1./sr)


        if lazy:
            anasig = AnalogSignal([], units='V', sampling_rate=sr * pq.Hz,
                                  t_start=t_start * pq.s,
                                  channel_index=channel_index)
            # we add the attribute lazy_shape with the size if loaded
            anasig.lazy_shape = tvect.shape
        else:
            # create analogsignal (sinus of 3 Hz)
            sig = np.sin(2*np.pi*tvect*sinus_freq + channel_index/5.*2*np.pi)+np.random.rand(tvect.size)
            anasig = AnalogSignal(sig, units= 'V', sampling_rate=sr * pq.Hz,
                                  t_start=t_start * pq.s,
                                  channel_index=channel_index)

        # for attributes out of neo you can annotate
        anasig.annotate(info = 'it is a sinus of %f Hz' %sinus_freq )

        return anasig
开发者ID:CINPLA,项目名称:python-neo,代码行数:35,代码来源:exampleio.py


示例3: create_analogsignal3

    def create_analogsignal3(self, parent=None, name='AnalogSignal3'):
        signal = AnalogSignal([[1, 2, 3], [4, 5, 6]], units='mV',
                              sampling_rate=2 * pq.kHz, t_start=100 * pq.s)

        signal.segment = parent
        self._assign_basic_attributes(signal, name=name)

        return signal
开发者ID:INM-6,项目名称:python-neo,代码行数:8,代码来源:test_nsdfio.py


示例4: create_analogsignal2

    def create_analogsignal2(self, parent=None, name='AnalogSignal2'):
        signal = AnalogSignal([[1], [2], [3], [4], [5]], units='mA',
                              sampling_period=0.5 * pq.ms)

        signal.segment = parent
        self._assign_annotations(signal)

        return signal
开发者ID:INM-6,项目名称:python-neo,代码行数:8,代码来源:test_nsdfio.py


示例5: create_analogsignal

    def create_analogsignal(self, parent=None, name='AnalogSignal1'):
        signal = AnalogSignal([[1.0, 2.5], [2.2, 3.1], [3.2, 4.4]], units='mV',
                              sampling_rate=100 * pq.Hz, t_start=2 * pq.min)

        signal.segment = parent
        self._assign_basic_attributes(signal, name=name)
        self._assign_annotations(signal)

        return signal
开发者ID:INM-6,项目名称:python-neo,代码行数:9,代码来源:test_nsdfio.py


示例6: read_analogsignal

    def read_analogsignal(self,
                          # the 2 first key arguments are imposed by neo.io
                          lazy = False,
                          cascade = True,
                          #channel index as given by the neuroshare API
                          channel_index = 0,
                          #time in seconds to be read
                          segment_duration = 0.,
                          #time in seconds to start reading from
                          t_start = 0.,
                          ):
        
        #some controls:        
        #if no segment duration is given, use the complete file
        if segment_duration ==0.:
            segment_duration=float(self.metadata["TimeSpan"])
        #if the segment duration is bigger than file, use the complete file
        if segment_duration >=float(self.metadata["TimeSpan"]):
            segment_duration=float(self.metadata["TimeSpan"])
            
        if lazy:
            anasig = AnalogSignal([], units="V", sampling_rate =  self.metadata["sampRate"] * pq.Hz,
                                  t_start=t_start * pq.s,
                                  )
            #create a dummie time vector                     
            tvect = np.arange(t_start, t_start+ segment_duration , 1./self.metadata["sampRate"])                                  
            # we add the attribute lazy_shape with the size if loaded
            anasig.lazy_shape = tvect.shape
        else:
            #get the analog object
            sig =  self.fd.get_entity(channel_index)
            #get the units (V, mV etc)            
            sigUnits = sig.units
            #get the electrode number
            chanName = sig.label[-4:]
            
            #transform t_start into index (reading will start from this index)           
            startat = int(t_start*self.metadata["sampRate"])
            #get the number of bins to read in
            bins = int((segment_duration+t_start) * self.metadata["sampRate"])
            
            #if the number of bins to read is bigger than 
            #the total number of bins, read only till the end of analog object
            if startat+bins > sig.item_count:
                bins = sig.item_count-startat
            #read the data from the sig object
            sig,_,_ = sig.get_data(index = startat, count = bins)
            #store it to the 'AnalogSignal' object
            anasig = AnalogSignal(sig, units = sigUnits, sampling_rate=self.metadata["sampRate"] * pq.Hz,
                                  t_start=t_start * pq.s,
                                  t_stop = (t_start+segment_duration)*pq.s,
                                  channel_index=channel_index)

            # annotate from which electrode the signal comes from
            anasig.annotate(info = "signal from channel %s" %chanName )

        return anasig
开发者ID:bal47,项目名称:python-neo,代码行数:57,代码来源:neuroshareapiio.py


示例7: read_segment

    def read_segment(self, n_start, n_stop, chlist=None, lazy=False, cascade=True):
        """Reads a Segment from the file and stores in database.

        The Segment will contain one AnalogSignal for each channel
        and will go from n_start to n_stop (in samples).

        Arguments:
            n_start : time in samples that the Segment begins
            n_stop : time in samples that the Segment ends

        Python indexing is used, so n_stop is not inclusive.

        Returns a Segment object containing the data.
        """
        # If no channel numbers provided, get all of them
        if chlist is None:
            chlist = self.loader.get_neural_channel_numbers()

        # Conversion from bits to full_range units
        conversion = self.full_range / 2**(8*self.header.sample_width)

        # Create the Segment
        seg = Segment(file_origin=self.filename)
        t_start = float(n_start) / self.header.f_samp
        t_stop = float(n_stop) / self.header.f_samp
        seg.annotate(t_start=t_start)
        seg.annotate(t_stop=t_stop)

        # Load data from each channel and store
        for ch in chlist:
            if lazy:
                sig = np.array([]) * conversion
            else:
                # Get the data from the loader
                sig = np.array(\
                    self.loader._get_channel(ch)[n_start:n_stop]) * conversion

            # Create an AnalogSignal with the data in it
            anasig = AnalogSignal(signal=sig,
                sampling_rate=self.header.f_samp*pq.Hz,
                t_start=t_start*pq.s, file_origin=self.filename,
                description='Channel %d from %f to %f' % (ch, t_start, t_stop),
                channel_index=int(ch))

            if lazy:
                anasig.lazy_shape = n_stop-n_start


            # Link the signal to the segment
            seg.analogsignals.append(anasig)

            # Link the signal to the recording channel from which it came
            #rc = self.channel_number_to_recording_channel[ch]
            #rc.analogsignals.append(anasig)

        return seg
开发者ID:ChrisNolan1992,项目名称:python-neo,代码行数:56,代码来源:blackrockio.py


示例8: _read_analogsignalarray

 def _read_analogsignalarray(self, node, parent):
     attributes = self._get_standard_attributes(node)
     # todo: handle channel_index
     sampling_rate = self._get_quantity(node["sampling_rate"])
     t_start = self._get_quantity(node["t_start"])
     signal = AnalogSignal(self._get_quantity(node["signal"]),
                           sampling_rate=sampling_rate, t_start=t_start,
                           **attributes)
     signal.segment = parent
     self.object_refs[node.attrs["object_ref"]] = signal
     return signal
开发者ID:INM-6,项目名称:python-neo,代码行数:11,代码来源:hdf5io.py


示例9: read_block

    def read_block(self, lazy=False, cascade=True):

        if self.filename is not None:
            self.stfio_rec = stfio.read(self.filename)

        bl = Block()
        bl.description = self.stfio_rec.file_description
        bl.annotate(comment=self.stfio_rec.comment)
        try:
            bl.rec_datetime = self.stfio_rec.datetime
        except:
            bl.rec_datetime = None

        if not cascade:
            return bl

        dt = np.round(self.stfio_rec.dt * 1e-3, 9) * pq.s  # ms to s
        sampling_rate = 1.0/dt
        t_start = 0 * pq.s

        # iterate over sections first:
        for j, recseg in enumerate(self.stfio_rec[0]):
            seg = Segment(index=j)
            length = len(recseg)

            # iterate over channels:
            for i, recsig in enumerate(self.stfio_rec):
                name = recsig.name
                unit = recsig.yunits
                try:
                    pq.Quantity(1, unit)
                except:
                    unit = ''

                if lazy:
                    signal = pq.Quantity([], unit)
                else:
                    signal = pq.Quantity(recsig[j], unit)
                anaSig = AnalogSignal(signal, sampling_rate=sampling_rate,
                                      t_start=t_start, name=str(name),
                                      channel_index=i)
                if lazy:
                    anaSig.lazy_shape = length
                seg.analogsignals.append(anaSig)

            bl.segments.append(seg)
            t_start = t_start + length * dt

        bl.create_many_to_one_relationship()

        return bl
开发者ID:ChrisNolan1992,项目名称:python-neo,代码行数:51,代码来源:stimfitio.py


示例10: _extract_signals

    def _extract_signals(self, data, metadata, lazy):

        signal = None
        if lazy and data.size > 0:
            signal = AnalogSignal([],
                                  units=self._determine_units(metadata),
                                  sampling_period=metadata['dt']*pq.ms)
            signal.lazy_shape = None
        else:
            arr = numpy.vstack(self._extract_array(data, channel_index)
                               for channel_index in range(metadata['first_index'], metadata['last_index'] + 1))
            if len(arr) > 0:
                signal = AnalogSignal(arr.T,
                                      units=self._determine_units(metadata),
                                      sampling_period=metadata['dt']*pq.ms)
        if signal is not None:
            signal.annotate(label=metadata["label"],
                            variable=metadata["variable"])
        return signal
开发者ID:CINPLA,项目名称:python-neo,代码行数:19,代码来源:pynnio.py


示例11: create_all_annotated

    def create_all_annotated(cls):
        times = cls.rquant(1, pq.s)
        signal = cls.rquant(1, pq.V)
        blk = Block()
        blk.annotate(**cls.rdict(3))

        seg = Segment()
        seg.annotate(**cls.rdict(4))
        blk.segments.append(seg)

        asig = AnalogSignal(signal=signal, sampling_rate=pq.Hz)
        asig.annotate(**cls.rdict(2))
        seg.analogsignals.append(asig)

        isig = IrregularlySampledSignal(times=times, signal=signal,
                                        time_units=pq.s)
        isig.annotate(**cls.rdict(2))
        seg.irregularlysampledsignals.append(isig)

        epoch = Epoch(times=times, durations=times)
        epoch.annotate(**cls.rdict(4))
        seg.epochs.append(epoch)

        event = Event(times=times)
        event.annotate(**cls.rdict(4))
        seg.events.append(event)

        spiketrain = SpikeTrain(times=times, t_stop=pq.s, units=pq.s)
        d = cls.rdict(6)
        d["quantity"] = pq.Quantity(10, "mV")
        d["qarray"] = pq.Quantity(range(10), "mA")
        spiketrain.annotate(**d)
        seg.spiketrains.append(spiketrain)

        chx = ChannelIndex(name="achx", index=[1, 2], channel_ids=[0, 10])
        chx.annotate(**cls.rdict(5))
        blk.channel_indexes.append(chx)

        unit = Unit()
        unit.annotate(**cls.rdict(2))
        chx.units.append(unit)

        return blk
开发者ID:theunissenlab,项目名称:python-neo,代码行数:43,代码来源:test_nixio.py


示例12: read_analogsignal

    def read_analogsignal(self,
                      channel_index=None,
                      lazy=False,
                      cascade=True,
                      ):
        """
        Read raw traces
        Arguments:
            channel_index: must be integer
        """
        try:
            channel_index = int(channel_index)
        except TypeError:
            print('channel_index must be int, not %s' %type(channel_index))

        if self._attrs['app_data']:
            bit_volts = self._attrs['app_data']['channel_bit_volts']
            sig_unit = 'uV'
        else:
            bit_volts = np.ones((self._attrs['shape'][1])) # TODO: find conversion in phy generated files
            sig_unit =  'bit'
        if lazy:
            anasig = AnalogSignal([],
                                  units=sig_unit,
                                  sampling_rate=self._attrs['kwik']['sample_rate']*pq.Hz,
                                  t_start=self._attrs['kwik']['start_time']*pq.s,
                                  channel_index=channel_index,
                                  )
            # we add the attribute lazy_shape with the size if loaded
            anasig.lazy_shape = self._attrs['shape'][0]
        else:
            data = self._kwd['recordings'][str(self._dataset)]['data'].value[:,channel_index]
            data = data * bit_volts[channel_index]
            anasig = AnalogSignal(data,
                                       units=sig_unit,
                                       sampling_rate=self._attrs['kwik']['sample_rate']*pq.Hz,
                                       t_start=self._attrs['kwik']['start_time']*pq.s,
                                       channel_index=channel_index,
                                       )
            data = [] # delete from memory
        # for attributes out of neo you can annotate
        anasig.annotate(info='raw traces')
        return anasig
开发者ID:bal47,项目名称:python-neo,代码行数:43,代码来源:kwikio.py


示例13: _extract_signal

 def _extract_signal(self, data, metadata, channel_index, lazy):
     signal = None
     if lazy:
         if channel_index in data[:, 1]:
             signal = AnalogSignal([],
                                   units=self._determine_units(metadata),
                                   sampling_period=metadata['dt']*pq.ms,
                                   channel_index=channel_index)
             signal.lazy_shape = None
     else:
         arr = self._extract_array(data, channel_index)
         if len(arr) > 0:
             signal = AnalogSignal(arr,
                                   units=self._determine_units(metadata),
                                   sampling_period=metadata['dt']*pq.ms,
                                   channel_index=channel_index)
     if signal is not None:
         signal.annotate(label=metadata["label"],
                         variable=metadata["variable"])
         return signal
开发者ID:bal47,项目名称:python-neo,代码行数:20,代码来源:pynnio.py


示例14: gettrace

def gettrace(trec,f):
    import numpy as np
    format_type_lenghts = [2,4,4,8]
    format_type = [np.int16,np.int32,np.float32,np.float64]
    pointsize = format_type_lenghts[int(trec.trDataFormat)]
    dtype = format_type[int(trec.trDataFormat)]
    f.seek(int(trec.trData))
    byte_string = f.read(int(trec.trDataPoints)*pointsize)
    import numpy as np
    ydata = np.fromstring(byte_string,dtype = dtype)
    tunit = pq.Quantity(1,str(trec.trXUnit))
    yunit = pq.Quantity(1,str(trec.trYUnit))
    sig = AnalogSignal(ydata*float(trec.trDataScaler)*yunit,
            sampling_period=float(trec.trXInterval)*tunit,
            units = trec.trYUnit[0])
    annotations = trec.__dict__.keys()
    annotations.remove('readlist')
    for a in annotations:
        d = {a:str(trec.__dict__[a])}
        sig.annotate(**d)
    return sig
开发者ID:psilentp,项目名称:Analysis,代码行数:21,代码来源:heka_io.py


示例15: read_analogsignal

    def read_analogsignal(self, lazy=False, cascade=True):
        if not HAVE_IGOR:
            raise Exception("igor package not installed. Try `pip install igor`")
        data = bw.load(self.filename)
        version = data['version']
        if version > 3:
            raise IOError("Igor binary wave file format version {0} is not supported.".format(version))
        content = data['wave']
        if "padding" in content:
            assert content['padding'].size == 0, "Cannot handle non-empty padding"
        if lazy:
            # not really lazy, since the `igor` module loads the data anyway
            signal = np.array((), dtype=content['wData'].dtype)
        else:
            signal = content['wData']
        note = content['note']
        header = content['wave_header']
        name = header['bname']
        assert header['botFullScale'] == 0
        assert header['topFullScale'] == 0
        units = "".join(header['dataUnits'])
        time_units = "".join(header['xUnits']) or "s"
        t_start = pq.Quantity(header['hsB'], time_units)
        sampling_period = pq.Quantity(header['hsA'], time_units)
        if self.parse_notes:
            try:
                annotations = self.parse_notes(note)
            except ValueError:
                warn("Couldn't parse notes field.")
                annotations = {'note': note}
        else:
            annotations = {'note': note}

        signal = AnalogSignal(signal, units=units, copy=False, t_start=t_start,
                              sampling_period=sampling_period, name=name,
                              file_origin=self.filename, **annotations)
        if lazy:
            signal.lazy_shape = content['wData'].shape
        return signal
开发者ID:CINPLA,项目名称:python-neo,代码行数:39,代码来源:igorproio.py


示例16: read_segment


#.........这里部分代码省略.........
                    unit = dataBlockHeader['Unit']
                    pos = pos_spikes[chan,unit]
                    stimearrays[chan, unit][pos] = time
                    if load_spike_waveform and n1*n2 != 0 :
                        swfarrays[chan,unit][pos,:,:] = np.fromstring( fid.read(n1*n2*2) , dtype = 'i2').reshape(n1,n2).astype('f4')
                    else:
                        fid.seek(n1*n2*2,1)
                    pos_spikes[chan,unit] +=1
                
                elif dataBlockHeader['Type'] == 4:
                    # event
                    pos = eventpositions[chan]
                    evarrays[chan]['times'][pos] = time
                    evarrays[chan]['labels'][pos] = dataBlockHeader['Unit']
                    eventpositions[chan]+= 1

                elif dataBlockHeader['Type'] == 5:
                    #signal
                    data = np.fromstring( fid.read(n2*2) , dtype = 'i2').astype('f4')
                    sigarrays[chan][sample_positions[chan] : sample_positions[chan]+data.size] = data
                    sample_positions[chan] += data.size


        ## Step 4: create neo object
        for chan, h in iteritems(eventHeaders):
            if lazy:
                times = []
                labels = None
            else:
                times = evarrays[chan]['times']
                labels = evarrays[chan]['labels']
            ea = EventArray(
                times*pq.s,
                labels=labels,
                channel_name=eventHeaders[chan]['Name'],
                channel_index=chan
            )
            if lazy:
                ea.lazy_shape = nb_events[chan]
            seg.eventarrays.append(ea)

            
        for chan, h in iteritems(slowChannelHeaders):
            if lazy:
                signal = [ ]
            else:
                if globalHeader['Version'] ==100 or globalHeader['Version'] ==101 :
                    gain = 5000./(2048*slowChannelHeaders[chan]['Gain']*1000.)
                elif globalHeader['Version'] ==102 :
                    gain = 5000./(2048*slowChannelHeaders[chan]['Gain']*slowChannelHeaders[chan]['PreampGain'])
                elif globalHeader['Version'] >= 103:
                    gain = globalHeader['SlowMaxMagnitudeMV']/(.5*(2**globalHeader['BitsPerSpikeSample'])*\
                                                        slowChannelHeaders[chan]['Gain']*slowChannelHeaders[chan]['PreampGain'])
                signal = sigarrays[chan]*gain
            anasig =  AnalogSignal(signal*pq.V,
                sampling_rate = float(slowChannelHeaders[chan]['ADFreq'])*pq.Hz,
                t_start = t_starts[chan]*pq.s,
                channel_index = slowChannelHeaders[chan]['Channel'],
                channel_name = slowChannelHeaders[chan]['Name'],
            )
            if lazy:
                anasig.lazy_shape = nb_samples[chan]
            seg.analogsignals.append(anasig)
            
        for (chan, unit), value in np.ndenumerate(nb_spikes):
            if nb_spikes[chan, unit] == 0: continue
            if lazy:
                times = [ ]
                waveforms = None
                t_stop = 0
            else:
                times = stimearrays[chan,unit]
                t_stop = times.max()
                if load_spike_waveform:
                    if globalHeader['Version'] <103:
                        gain = 3000./(2048*dspChannelHeaders[chan]['Gain']*1000.)
                    elif globalHeader['Version'] >=103 and globalHeader['Version'] <105:
                        gain = globalHeader['SpikeMaxMagnitudeMV']/(.5*2.**(globalHeader['BitsPerSpikeSample'])*1000.)
                    elif globalHeader['Version'] >105:
                        gain = globalHeader['SpikeMaxMagnitudeMV']/(.5*2.**(globalHeader['BitsPerSpikeSample'])*globalHeader['SpikePreAmpGain'])                    
                    waveforms = swfarrays[chan, unit] * gain * pq.V
                else:
                    waveforms = None
            sptr = SpikeTrain(
                times,
                units='s', 
                t_stop=t_stop*pq.s,
                waveforms=waveforms
            )
            sptr.annotate(unit_name = dspChannelHeaders[chan]['Name'])
            sptr.annotate(channel_index = chan)
            for key, val in dspChannelHeaders[chan].iteritems():
                sptr.annotate(**{key: val})

            if lazy:
                sptr.lazy_shape = nb_spikes[chan,unit]
            seg.spiketrains.append(sptr)

        seg.create_many_to_one_relationship()
        return seg
开发者ID:bal47,项目名称:python-neo,代码行数:101,代码来源:plexonio.py


示例17: read_block


#.........这里部分代码省略.........
                sampling_rate = 1. / (header['fADCSampleInterval'] *
                                      nbchannel * 1.e-6) * pq.Hz
            elif version >= 2.:
                sampling_rate = 1.e6 / \
                    header['protocol']['fADCSequenceInterval'] * pq.Hz

            # construct block
            # one sweep = one segment in a block
            pos = 0
            for j in range(episode_array.size):
                seg = Segment(index=j)

                length = episode_array[j]['len']

                if version < 2.:
                    fSynchTimeUnit = header['fSynchTimeUnit']
                elif version >= 2.:
                    fSynchTimeUnit = header['protocol']['fSynchTimeUnit']

                if (fSynchTimeUnit != 0) and (mode == 1):
                    length /= fSynchTimeUnit

                if not lazy:
                    subdata = data[pos:pos+length]
                    subdata = subdata.reshape((int(subdata.size/nbchannel),
                                               nbchannel)).astype('f')
                    if dt == np.dtype('i2'):
                        if version < 2.:
                            reformat_integer_v1(subdata, nbchannel, header)
                        elif version >= 2.:
                            reformat_integer_v2(subdata, nbchannel, header)

                pos += length

                if version < 2.:
                    chans = [chan_num for chan_num in
                             header['nADCSamplingSeq'] if chan_num >= 0]
                else:
                    chans = range(nbchannel)
                for n, i in enumerate(chans[:nbchannel]):  # fix SamplingSeq
                    if version < 2.:
                        name = header['sADCChannelName'][i].replace(b' ', b'')
                        unit = header['sADCUnits'][i].replace(b'\xb5', b'u').\
                            replace(b' ', b'').decode('utf-8')  # \xb5 is µ
                        num = header['nADCPtoLChannelMap'][i]
                    elif version >= 2.:
                        lADCIi = header['listADCInfo'][i]
                        name = lADCIi['ADCChNames'].replace(b' ', b'')
                        unit = lADCIi['ADCChUnits'].replace(b'\xb5', b'u').\
                            replace(b' ', b'').decode('utf-8')
                        num = header['listADCInfo'][i]['nADCNum']
                    if (fSynchTimeUnit == 0):
                        t_start = float(episode_array[j]['offset']) / sampling_rate
                    else:
                        t_start = float(episode_array[j]['offset']) * fSynchTimeUnit *1e-6* pq.s
                    t_start = t_start.rescale('s')
                    try:
                        pq.Quantity(1, unit)
                    except:
                        unit = ''

                    if lazy:
                        signal = [] * pq.Quantity(1, unit)
                    else:
                        signal = pq.Quantity(subdata[:, n], unit)

                    anaSig = AnalogSignal(signal, sampling_rate=sampling_rate,
                                          t_start=t_start,
                                          name=str(name),
                                          channel_index=int(num))
                    if lazy:
                        anaSig.lazy_shape = length / nbchannel
                    seg.analogsignals.append(anaSig)
                bl.segments.append(seg)

            if mode in [3, 5]:  # TODO check if tags exits in other mode
                # tag is EventArray that should be attached to Block
                # It is attched to the first Segment
                times = []
                labels = []
                comments = []
                for i, tag in enumerate(header['listTag']):
                    times.append(tag['lTagTime']/sampling_rate)
                    labels.append(str(tag['nTagType']))
                    comments.append(clean_string(tag['sComment']))
                times = np.array(times)
                labels = np.array(labels, dtype='S')
                comments = np.array(comments, dtype='S')
                # attach all tags to the first segment.
                seg = bl.segments[0]
                if lazy:
                    ea = Event(times=[] * pq.s, labels=np.array([], dtype='S'))
                    ea.lazy_shape = len(times)
                else:
                    ea = Event(times=times * pq.s, labels=labels,
                               comments=comments)
                seg.events.append(ea)

        bl.create_many_to_one_relationship()
        return bl
开发者ID:kylerbrown,项目名称:python-neo,代码行数:101,代码来源:axonio.py


示例18: read_one_channel_continuous

    def read_one_channel_continuous(self, fid, channel_num, header,
                                    take_ideal_sampling_rate, lazy=True):
        # read AnalogSignal
        channelHeader = header.channelHeaders[channel_num]

        # data type
        if channelHeader.kind == 1:
            dt = np.dtype('i2')
        elif channelHeader.kind == 9:
            dt = np.dtype('f4')

        # sample rate
        if take_ideal_sampling_rate:
            sampling_rate = channelHeader.ideal_rate * pq.Hz
        else:
            if header.system_id in [1, 2, 3, 4, 5]:  # Before version 5
                #~ print channel_num, channelHeader.divide, \
                #~ header.us_per_time, header.time_per_adc
                sample_interval = (channelHeader.divide * header.us_per_time *
                                   header.time_per_adc) * 1e-6
            else:
                sample_interval = (channelHeader.l_chan_dvd *
                                   header.us_per_time * header.dtime_base)
            sampling_rate = (1. / sample_interval) * pq.Hz

        # read blocks header to preallocate memory by jumping block to block
        if channelHeader.blocks==0:
            return [ ]
        fid.seek(channelHeader.firstblock)
        blocksize = [0]
        starttimes = []
        for b in range(channelHeader.blocks):
            blockHeader = HeaderReader(fid, np.dtype(blockHeaderDesciption))
            if len(blocksize) > len(starttimes):
                starttimes.append(blockHeader.start_time)
            blocksize[-1] += blockHeader.items

            if blockHeader.succ_block > 0:
                # ugly but CED does not guarantee continuity in AnalogSignal
                fid.seek(blockHeader.succ_block)
                nextBlockHeader = HeaderReader(fid,
                                               np.dtype(blockHeaderDesciption))
                sample_interval = (blockHeader.end_time -
                                   blockHeader.start_time) / \
                                  (blockHeader.items - 1)
                interval_with_next = nextBlockHeader.start_time - \
                    blockHeader.end_time
                if interval_with_next > sample_interval:
                    blocksize.append(0)
                fid.seek(blockHeader.succ_block)

        ana_sigs = []
        if channelHeader.unit in unit_convert:
            unit = pq.Quantity(1, unit_convert[channelHeader.unit])
        else:
            # print channelHeader.unit
            try:
                unit = pq.Quantity(1, channelHeader.unit)
            except:
                unit = pq.Quantity(1, '')

        for b, bs in enumerate(blocksize):
            if lazy:
                signal = [] * unit
            else:
                signal = pq.Quantity(np.empty(bs, dtype='f4'), units=unit)
            ana_sig = AnalogSignal(
                signal, sampling_rate=sampling_rate,
                t_start=(starttimes[b] * header.us_per_time *
                         header.dtime_base * pq.s),
                channel_index=channel_num)
            ana_sigs.append(ana_sig)

        if lazy:
            for s, ana_sig in enumerate(ana_sigs):
                ana_sig.lazy_shape = blocksize[s]

        else:
            # read data  by jumping block to block
            fid.seek(channelHeader.firstblock)
            pos = 0
            numblock = 0
            for b in range(channelHeader.blocks):
                blockHeader = HeaderReader(
                    fid, np.dtype(blockHeaderDesciption))
                # read data
                sig = np.fromstring(fid.read(blockHeader.items * dt.itemsize),
                                    dtype=dt)
                ana_sigs[numblock][pos:pos + sig.size] = \
                    sig.reshape(-1, 1).astype('f4') * unit
                pos += sig.size
                if pos >= blocksize[numblock]:
                    numblock += 1
                    pos = 0
                # jump to next block
                if blockHeader.succ_block > 0:
                    fid.seek(blockHeader.succ_block)

        # convert for int16
        if dt.kind == 'i':
#.........这里部分代码省略.........
开发者ID:MartinHeroux,项目名称:ScientificallySound_files,代码行数:101,代码来源:spike2io.py


示例19: read_nsx

    def read_nsx(self, filename_nsx, seg, lazy, cascade):
        # basic header
        dt0 = [('header_id','S8'),
                    ('ver_major','uint8'),
                    ('ver_minor','uint8'),
                    ('header_size', 'uint32'), #i.e. index of first data
                    ('group_label', 'S16'),# Read number of packet bytes, i.e. byte per samplepos
                    ('comments', 'S256'),
                    ('period_ratio', 'uint32'),
                    ('sampling_rate', 'uint32'),
                    ('window_datetime', 'S16'),
                    ('nb_channel', 'uint32'),
                ]
        nsx_header = h = np.fromfile(filename_nsx, count = 1, dtype = dt0)[0]
        version = '{0}.{1}'.format(h['ver_major'], h['ver_minor'])
        seg.annotate(blackrock_version = version)
        seg.rec_datetime = get_window_datetime(nsx_header['window_datetime'])
        nb_channel = h['nb_channel']
        sr = float(h['sampling_rate'])/h['period_ratio']

        if not cascade:
            return
        
        # extended header = channel information
        dt1 = [('header_id','S2'),
                    ('channel_id', 'uint16'),
                    ('label', 'S16'),
                    ('connector_id', 'uint8'),
                    ('connector_pin', 'uint8'),
                    ('min_digital_val', 'int16'),
                    ('max_digital_val', 'int16'),
                    ('min_analog_val', 'int16'),
                    ('max_analog_val', 'int16'),
                    ('units', 'S16'),
                    ('hi_freq_corner',  'uint32'),
                    ('hi_freq_order',  'uint32'),
                    ('hi_freq_type',  'uint16'), #0=None 1=Butterworth
                    ('lo_freq_corner',  'uint32'),
                    ('lo_freq_order',  'uint32'),
                    ('lo_freq_type',  'uint16'), #0=None 1=Butterworth
            ]
        channels_header = ch= np.memmap(filename_nsx, shape = nb_channel,
                    offset = np.dtype(dt0).itemsize,   dtype = dt1)
        
        # read data
        dt2 = [('header_id','uint8'),
                    ('n_start','uint32'),
                    ('nb_sample','uint32'),
                    ]
        sample_header = sh =  np.memmap(filename_nsx, dtype = dt2, shape = 1,
                        offset = nsx_header['header_size'])[0]
        nb_sample = sample_header['nb_sample']
        data = np.memmap(filename_nsx, dtype = 'int16', shape = (nb_sample, nb_channel),
                        offset = nsx_header['header_size'] +np.dtype(dt2).itemsize )
        
        # create new objects
        for i in range(nb_channel):
            unit = channels_header['units'][i].decode()
            if lazy:
                sig = [ ]
            else:
                sig = data[:,i].astype(float)
                # dig value to physical value
                if ch['max_analog_val'][i] == -ch['min_analog_val'][i] and\
                     ch['max_digital_val'][i] == -ch['min_digital_val'][i]:
                    # when symmetric it is simple
                    sig *= float(ch['max_analog_val'][i])/float(ch['max_digital_val'][i])
                else:
                    # general case
                    sig -= ch['min_digital_val'][i]
                    sig *= float(ch['max_analog_val'][i] - ch['min_analog_val'])/\
                                    float(ch['max_digital_val'][i] - ch['min_digital_val'])
                    sig += float(ch['min_analog_val'][i])
            anasig = AnalogSignal(signal = pq.Quantity(sig,unit, copy = False),
                                                        sampling_rate = sr*pq.Hz,
                                                        t_start = sample_header['n_start']/sr*pq.s,
                                                        name = str(ch['label'][i]),
                                                        channel_index = int(ch['channel_id'][i]))
            if lazy:
                anasig.lazy_shape = nb_sample
            seg.analogsignals.append(anasig)
开发者ID:guangxingli,项目名称:python-neo,代码行数:81,代码来源:blackrockio.py


示例20: read_analogsignal

该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python core.Block类代码示例发布时间:2022-05-27
下一篇:
Python rosutils.RosUtils类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap