• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python files_helper.FilesHelper类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tvb.core.entities.file.files_helper.FilesHelper的典型用法代码示例。如果您正苦于以下问题:Python FilesHelper类的具体用法?Python FilesHelper怎么用?Python FilesHelper使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了FilesHelper类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: launch

    def launch(self, weights, tracts, input_data):
        """
        Execute import operations: process the weights and tracts csv files, then use
        the reference connectivity passed as input_data for the rest of the attributes.

        :param weights: csv file containing the weights measures
        :param tracts:  csv file containing the tracts measures
        :param input_data: a reference connectivity with the additional attributes

        :raises LaunchException: when the number of nodes in CSV files doesn't match the one in the connectivity
        """
        dti_service = DTIPipelineService()
        dti_service._process_csv_file(weights, dti_service.WEIGHTS_FILE)
        dti_service._process_csv_file(tracts, dti_service.TRACT_FILE)
        weights_matrix = read_list_data(os.path.join(os.path.dirname(weights), dti_service.WEIGHTS_FILE))
        tract_matrix = read_list_data(os.path.join(os.path.dirname(tracts), dti_service.TRACT_FILE))
        FilesHelper.remove_files([os.path.join(os.path.dirname(weights), dti_service.WEIGHTS_FILE), 
                                  os.path.join(os.path.dirname(tracts), dti_service.TRACT_FILE)])

        if weights_matrix.shape[0] != input_data.orientations.shape[0]:
            raise LaunchException("The csv files define %s nodes but the connectivity you selected as reference "
                                  "has only %s nodes." % (weights_matrix.shape[0], input_data.orientations.shape[0]))
        result = Connectivity()
        result.storage_path = self.storage_path
        result.nose_correction = input_data.nose_correction
        result.centres = input_data.centres
        result.region_labels = input_data.region_labels
        result.weights = weights_matrix
        result.tract_lengths = tract_matrix
        result.orientations = input_data.orientations
        result.areas = input_data.areas
        result.cortical = input_data.cortical
        result.hemispheres = input_data.hemispheres
        return result
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:34,代码来源:csv_connectivity_importer.py


示例2: ProjectUpdateManager

class ProjectUpdateManager(UpdateManager):
    """
    This goes through all the scripts that are newer than the version number
    written in the current project metadata xml, and executes them on the project folder.
    """

    def __init__(self, project_path):

        self.project_path = project_path
        self.files_helper = FilesHelper()
        # This assumes that old project metadata file can be parsed by current version.
        self.project_meta = self.files_helper.read_project_metadata(project_path)
        from_version = int(self.project_meta.get('version', 0))

        super(ProjectUpdateManager, self).__init__(project_update_scripts, from_version,
                                                   TvbProfile.current.version.PROJECT_VERSION)


    def run_all_updates(self):
        """
        Upgrade the project to the latest structure
        Go through all update scripts, from project version up to the current_version in the code
        """
        super(ProjectUpdateManager, self).run_all_updates(project_path=self.project_path)

        # update project version in metadata
        self.project_meta['version'] = self.current_version
        self.files_helper.write_project_metadata_from_dict(self.project_path, self.project_meta)
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:28,代码来源:project_update_manager.py


示例3: ImportService

class ImportService():
    """
    Service for importing TVB entities into system.
    It supports TVB exported H5 files as input, but it should also handle H5 files 
    generated outside of TVB, as long as they respect the same structure.
    """


    def __init__(self):
        self.logger = get_logger(__name__)
        self.user_id = None
        self.files_helper = FilesHelper()
        self.created_projects = []


    def _download_and_unpack_project_zip(self, uploaded, uq_file_name, temp_folder):

        if isinstance(uploaded, FieldStorage) or isinstance(uploaded, Part):
            if not uploaded.file:
                raise ProjectImportException("Please select the archive which contains the project structure.")
            with open(uq_file_name, 'wb') as file_obj:
                self.files_helper.copy_file(uploaded.file, file_obj)
        else:
            shutil.copy2(uploaded, uq_file_name)

        try:
            self.files_helper.unpack_zip(uq_file_name, temp_folder)
        except FileStructureException, excep:
            self.logger.exception(excep)
            raise ProjectImportException("Bad ZIP archive provided. A TVB exported project is expected!")
开发者ID:paolavals,项目名称:tvb-framework,代码行数:30,代码来源:import_service.py


示例4: __init__

class ProjectService:
    """
    Services layer for Project entities.
    """


    def __init__(self):
        self.logger = get_logger(__name__)
        self.structure_helper = FilesHelper()


    def store_project(self, current_user, is_create, selected_id, **data):
        """
        We want to create/update a project entity.
        """
        #Validate Unique Name
        new_name = data["name"]
        if len(new_name) < 1:
            raise ProjectServiceException("Invalid project name!")
        projects_no = dao.count_projects_for_name(new_name, selected_id)
        if projects_no > 0:
            err = {'name': 'Please choose another name, this one is used!'}
            raise formencode.Invalid("Duplicate Name Error", {}, None, error_dict=err)
        started_operations = dao.get_operation_numbers(selected_id)[1]
        if started_operations > 0:
            raise ProjectServiceException("A project can not be renamed while operations are still running!")
        if is_create:
            current_proj = model.Project(new_name, current_user.id, data["description"])
            self.structure_helper.get_project_folder(current_proj)
        else:
            try:
                current_proj = dao.get_project_by_id(selected_id)
            except Exception, excep:
                self.logger.exception("An error has occurred!")
                raise ProjectServiceException(str(excep))
            if current_proj.name != new_name:
                self.structure_helper.rename_project_structure(current_proj.name, new_name)
            current_proj.name = new_name
            current_proj.description = data["description"]
        #Commit to make sure we have a valid ID
        current_proj.refresh_update_date()
        self.structure_helper.write_project_metadata(current_proj)
        current_proj = dao.store_entity(current_proj)

        #Retrieve, to initialize lazy attributes
        current_proj = dao.get_project_by_id(current_proj.id)
        #Update share settings on current Project entity
        visited_pages = []
        prj_admin = current_proj.administrator.username
        if 'visited_pages' in data and data['visited_pages']:
            visited_pages = data['visited_pages'].split(',')
        for page in visited_pages:
            members = UserService.retrieve_all_users(prj_admin, int(page))[0]
            members = [m.id for m in members]
            dao.delete_members_for_project(current_proj.id, members)
        selected_user_ids = data["users"]
        dao.add_members_to_project(current_proj.id, selected_user_ids)
        #Finish operation
        self.logger.debug("Edit/Save OK for project:" + str(current_proj.id) + ' by user:' + current_user.username)
        return current_proj
开发者ID:gummadhav,项目名称:tvb-framework,代码行数:60,代码来源:project_service.py


示例5: export

    def export(self, data, export_folder, project):
        """
        Exports data type:
        1. If data is a normal data type, simply exports storage file (HDF format)
        2. If data is a DataTypeGroup creates a zip with all files for all data types
        """
        download_file_name = self.get_export_file_name(data)
        files_helper = FilesHelper()
         
        if self.is_data_a_group(data):
            all_datatypes = self._get_all_data_types_arr(data)
            
            if all_datatypes is None or len(all_datatypes) == 0:
                raise ExportException("Could not export a data type group with no data")    
            
            zip_file = os.path.join(export_folder, download_file_name)
            
            # Now process each data type from group and add it to ZIP file
            operation_folders = []
            for data_type in all_datatypes:
                operation_folder = files_helper.get_operation_folder(project.name, data_type.fk_from_operation)
                operation_folders.append(operation_folder)
                
            # Create ZIP archive    
            files_helper.zip_folders(zip_file, operation_folders, self.OPERATION_FOLDER_PREFIX)
                        
            return download_file_name, zip_file, True

        else:
            project_folder = files_helper.get_project_folder(project)
            data_file = os.path.join(project_folder, data.get_storage_file_path())

            return download_file_name, data_file, False
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:33,代码来源:tvb_export.py


示例6: launch

    def launch(self, weights, weights_delimiter, tracts, tracts_delimiter, input_data):
        """
        Execute import operations: process the weights and tracts csv files, then use
        the reference connectivity passed as input_data for the rest of the attributes.

        :param weights: csv file containing the weights measures
        :param tracts:  csv file containing the tracts measures
        :param input_data: a reference connectivity with the additional attributes

        :raises LaunchException: when the number of nodes in CSV files doesn't match the one in the connectivity
        """
        weights_matrix = self._read_csv_file(weights, weights_delimiter)
        tract_matrix = self._read_csv_file(tracts, tracts_delimiter)

        FilesHelper.remove_files([weights, tracts])

        if weights_matrix.shape[0] != input_data.number_of_regions:
            raise LaunchException("The csv files define %s nodes but the connectivity you selected as reference "
                                  "has only %s nodes." % (weights_matrix.shape[0], input_data.number_of_regions))
        result = Connectivity()
        result.storage_path = self.storage_path
        result.centres = input_data.centres
        result.region_labels = input_data.region_labels
        result.weights = weights_matrix
        result.tract_lengths = tract_matrix
        result.orientations = input_data.orientations
        result.areas = input_data.areas
        result.cortical = input_data.cortical
        result.hemispheres = input_data.hemispheres
        return result
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:30,代码来源:csv_connectivity_importer.py


示例7: export_project

    def export_project(self, project, optimize_size=False):
        """
        Given a project root and the TVB storage_path, create a ZIP
        ready for export.
        :param project: project object which identifies project to be exported
        """
        if project is None:
            raise ExportException("Please provide project to be exported")

        files_helper = FilesHelper()
        project_folder = files_helper.get_project_folder(project)
        project_datatypes = self._gather_project_datatypes(project, optimize_size)
        to_be_exported_folders = []
        considered_op_ids = []
        min_dt_date = datetime.now()

        if optimize_size:
            ## take only the DataType with visibility flag set ON
            for dt in project_datatypes:
                if dt[KEY_OPERATION_ID] not in considered_op_ids:
                    to_be_exported_folders.append({'folder': files_helper.get_project_folder(project,
                                                                                             str(dt[KEY_OPERATION_ID])),
                                                   'archive_path_prefix': str(dt[KEY_OPERATION_ID]) + os.sep})
                    considered_op_ids.append(dt[KEY_OPERATION_ID])
                    if min_dt_date > dt[KEY_DT_DATE]:
                        min_dt_date = dt[KEY_DT_DATE]
        else:
            to_be_exported_folders.append({'folder': project_folder,
                                           'archive_path_prefix': '', 'exclude': ["TEMP"]})
            if project_datatypes:
                min_dt_date = min([dt[KEY_DT_DATE] for dt in project_datatypes])

        # Compute path and name of the zip file
        now = datetime.now()
        date_str = now.strftime("%Y-%m-%d_%H-%M")
        zip_file_name = "%s_%s.%s" % (date_str, project.name, self.ZIP_FILE_EXTENSION)

        export_folder = self._build_data_export_folder(project)
        result_path = os.path.join(export_folder, zip_file_name)

        with TvbZip(result_path, "w") as zip_file:
            # Pack project [filtered] content into a ZIP file:
            LOG.debug("Done preparing, now we will write folders " + str(len(to_be_exported_folders)))
            LOG.debug(str(to_be_exported_folders))
            for pack in to_be_exported_folders:
                zip_file.write_folder(**pack)
            LOG.debug("Done exporting files, now we will write the burst configurations...")
            self._export_bursts(project, project_datatypes, zip_file)
            LOG.debug("Done exporting burst configurations, now we will export linked DTs")
            self._export_linked_datatypes(project, zip_file, min_dt_date)
            ## Make sure the Project.xml file gets copied:
            if optimize_size:
                LOG.debug("Done linked, now we write the project xml")
                zip_file.write(files_helper.get_project_meta_file_path(project.name), files_helper.TVB_PROJECT_FILE)
            LOG.debug("Done, closing")

        return result_path
开发者ID:unimauro,项目名称:tvb-framework,代码行数:57,代码来源:export_manager.py


示例8: _update_datatype_disk_size

 def _update_datatype_disk_size(self, file_path):
     """
     Computes and updates the disk_size attribute of the DataType, for which was created the given file.
     """
     file_handler = FilesHelper()
     datatype_gid = self._get_manager(file_path).get_gid_attribute()
     datatype = dao.get_datatype_by_gid(datatype_gid)
     
     if datatype is not None:
         datatype.disk_size = file_handler.compute_size_on_disk(file_path)
         dao.store_entity(datatype)
开发者ID:unimauro,项目名称:tvb-framework,代码行数:11,代码来源:files_update_manager.py


示例9: initialize_storage

def initialize_storage():
    """
    Create Projects storage root folder in case it does not exist.
    """
    try:
        helper = FilesHelper()
        helper.check_created()
    except FileStructureException:
        # Do nothing, because we do not have any UI to display exception
        logger = get_logger("tvb.core.services.initialize_storage")
        logger.exception("Could not make sure the root folder exists!")
开发者ID:rajul,项目名称:tvb-framework,代码行数:11,代码来源:project_service.py


示例10: get_gifty_file_name

def get_gifty_file_name(project_id, desired_name):
    """
    Compute non-existent file name, in the TEMP folder of
    the given project.
    Try desired_name, and if already exists, try adding a number.
    """
    if project_id:
        project = dao.get_project_by_id(project_id)
        file_helper = FilesHelper()
        temp_path = file_helper.get_project_folder(project, FilesHelper.TEMP_FOLDER)
        return get_unique_file_name(temp_path, desired_name)[0]
    return get_unique_file_name(cfg.TVB_STORAGE, desired_name)[0]
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:12,代码来源:helper_handler.py


示例11: _adapt_epileptor_simulations

def _adapt_epileptor_simulations():
    """
    Previous Simulations on EpileptorWithPermitivity model, should be converted to use the Epileptor model.
    As the parameters from the two models are having different ranges and defaults, we do not translate parameters,
    we only set the Epileptor as model instead of EpileptorPermittivityCoupling, and leave the model params to defaults.
    """
    session = SA_SESSIONMAKER()
    epileptor_old = "EpileptorPermittivityCoupling"
    epileptor_new = "Epileptor"
    param_model = "model"

    try:
        all_ep_ops = session.query(model.Operation).filter(
            model.Operation.parameters.ilike('%"' + epileptor_old + '"%')).all()
        files_helper = FilesHelper()
        all_bursts = dict()

        for ep_op in all_ep_ops:
            try:
                op_params = parse_json_parameters(ep_op.parameters)
                if op_params[param_model] != epileptor_old:
                    LOGGER.debug("Skipping op " + str(op_params[param_model]) + " -- " + str(ep_op))
                    continue

                LOGGER.debug("Updating " + str(op_params))
                op_params[param_model] = epileptor_new
                ep_op.parameters = json.dumps(op_params, cls=MapAsJson.MapAsJsonEncoder)
                LOGGER.debug("New params:" + ep_op.parameters)
                files_helper.write_operation_metadata(ep_op)

                burst = dao.get_burst_for_operation_id(ep_op.id)
                if burst is not None:
                    LOGGER.debug("Updating burst:" + str(burst))
                    burst.prepare_after_load()
                    burst.simulator_configuration[param_model] = {'value': epileptor_new}
                    burst._simulator_configuration = json.dumps(burst.simulator_configuration,
                                                                cls=MapAsJson.MapAsJsonEncoder)
                    if not all_bursts.has_key(burst.id):
                        all_bursts[burst.id] = burst

            except Exception:
                LOGGER.exception("Could not process " + str(ep_op))

        session.add_all(all_ep_ops)
        session.add_all(all_bursts.values())
        session.commit()

    except Exception:
        LOGGER.exception("Could not update Simulation Epileptor Params")
    finally:
        session.close()
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:51,代码来源:7350_update_code.py


示例12: introduce_unmapped_node

def introduce_unmapped_node(out_pth, conn_zip_pth):
    """
    Creates a connectivity with one extra node in the first position.
    This node represents the unmapped regions.
    :param out_pth: destination path
    :param conn_zip_pth: connectivity zip path.
    """
    fh = FilesHelper()
    tmp_pth = os.path.splitext(out_pth)[0]
    fh.check_created(tmp_pth)
    files = fh.unpack_zip(conn_zip_pth, tmp_pth)
    for file_name in files:
        file_name_low = file_name.lower()
        if "centres" in file_name_low:
            with open(file_name) as f:
                lines = f.readlines()
            with open(file_name, "w") as f:
                f.write("None  0.000000  0.000000  0.000000\n")
                f.writelines(lines)
        elif "weight" in file_name_low or "tract" in file_name_low:
            with open(file_name) as f:
                lines = f.readlines()
                nr_regions = len(lines)
            with open(file_name, "w") as f:
                f.write("   0.0000000e+00" * (nr_regions + 1) + "\n")
                for line in lines:
                    f.write("   0.0000000e+00" + line)
        else:
            raise Exception("this transformation does not support the file " + file_name)

    fh.zip_folder(out_pth, tmp_pth)
    fh.remove_folder(tmp_pth)
开发者ID:cgncvk,项目名称:tvb-data,代码行数:32,代码来源:adapt.py


示例13: _build_data_export_folder

    def _build_data_export_folder(self, data):
        """
        This method computes the folder where results of an export operation will be 
        stored for a while (e.g until download is done; or for 1 day)
        """
        now = datetime.now()
        date_str = "%d-%d-%d_%d-%d-%d_%d" % (now.year, now.month, now.day, now.hour,
                                             now.minute, now.second, now.microsecond)
        tmp_str = date_str + "@" + data.gid
        data_export_folder = os.path.join(self.export_folder, tmp_str)
        files_helper = FilesHelper()
        files_helper.check_created(data_export_folder)

        return data_export_folder
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:14,代码来源:export_manager.py


示例14: setUp

 def setUp(self):
     """
     Set up the context needed by the tests.
     """
     self.files_helper = FilesHelper()
     self.test_user = TestFactory.create_user()
     self.test_project = TestFactory.create_project(self.test_user, self.PROJECT_NAME)
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:7,代码来源:files_helper_test.py


示例15: _store_imported_datatypes_in_db

    def _store_imported_datatypes_in_db(self, project, all_datatypes, dt_burst_mappings, burst_ids_mapping):
        def by_time(dt):
            return dt.create_date or datetime.now()

        if burst_ids_mapping is None:
            burst_ids_mapping = {}
        if dt_burst_mappings is None:
            dt_burst_mappings = {}

        all_datatypes.sort(key=by_time)

        for datatype in all_datatypes:
            old_burst_id = dt_burst_mappings.get(datatype.gid)

            if old_burst_id is not None:
                datatype.fk_parent_burst = burst_ids_mapping[old_burst_id]

            datatype_allready_in_tvb = dao.get_datatype_by_gid(datatype.gid)

            if not datatype_allready_in_tvb:
                # Compute disk size. Similar to ABCAdapter._capture_operation_results.
                # No need to close the h5 as we have not written to it.
                associated_file = os.path.join(datatype.storage_path, datatype.get_storage_file_name())
                datatype.disk_size = FilesHelper.compute_size_on_disk(associated_file)

                self.store_datatype(datatype)
            else:
                FlowService.create_link([datatype_allready_in_tvb.id], project.id)
开发者ID:paolavals,项目名称:tvb-framework,代码行数:28,代码来源:import_service.py


示例16: __init__

    def __init__(self):
        micro_postfix = "_%d" % int(time.time() * 1000000)

        # Here create all structures needed later for data types creation
        self.files_helper = FilesHelper()

        # First create user 
        user = model.User("datatype_factory_user" + micro_postfix, "test_pass",
                          "[email protected]" + micro_postfix, True, "user")
        self.user = dao.store_entity(user)

        # Now create a project
        project_service = ProjectService()
        data = dict(name='DatatypesFactoryProject' + micro_postfix, description='test_desc', users=[])
        self.project = project_service.store_project(self.user, True, None, **data)

        # Create algorithm
        alg_category = model.AlgorithmCategory('one', True)
        dao.store_entity(alg_category)
        ad = model.Algorithm(SIMULATOR_MODULE, SIMULATOR_CLASS, alg_category.id)
        self.algorithm = dao.get_algorithm_by_module(SIMULATOR_MODULE, SIMULATOR_CLASS)
        if self.algorithm is None:
            self.algorithm = dao.store_entity(ad)

        # Create an operation
        self.meta = {DataTypeMetaData.KEY_SUBJECT: self.USER_FULL_NAME,
                     DataTypeMetaData.KEY_STATE: self.DATATYPE_STATE}
        operation = model.Operation(self.user.id, self.project.id, self.algorithm.id, 'test parameters',
                                    meta=json.dumps(self.meta), status=model.STATUS_FINISHED)
        self.operation = dao.store_entity(operation)
开发者ID:maedoc,项目名称:tvb-framework,代码行数:30,代码来源:datatypes_factory.py


示例17: transactional_setup_method

 def transactional_setup_method(self):
     """
     Reset the database before each test.
     """
     self.project_service = ProjectService()
     self.structure_helper = FilesHelper()
     self.test_user = TestFactory.create_user()
开发者ID:maedoc,项目名称:tvb-framework,代码行数:7,代码来源:project_service_test.py


示例18: __init__

    def __init__(self):
        micro_postfix = "_%d" % int(time.time() * 1000000)

        # Here create all structures needed later for data types creation
        self.files_helper = FilesHelper()

        # First create user 
        user = model.User("datatype_factory_user" + micro_postfix, "test_pass",
                          "[email protected]" + micro_postfix, True, "user")
        self.user = dao.store_entity(user)

        # Now create a project
        project_service = ProjectService()
        data = dict(name='DatatypesFactoryProject' + micro_postfix, description='test_desc', users=[])
        self.project = project_service.store_project(self.user, True, None, **data)

        # Create algorithm
        alg_category = model.AlgorithmCategory('one', True)
        dao.store_entity(alg_category)
        alg_group = model.AlgorithmGroup("test_module1", "classname1", alg_category.id)
        dao.store_entity(alg_group)
        algorithm = model.Algorithm(alg_group.id, 'id', name='', req_data='', param_name='', output='')
        self.algorithm = dao.store_entity(algorithm)

        #Create an operation
        self.meta = {DataTypeMetaData.KEY_SUBJECT: self.USER_FULL_NAME,
                     DataTypeMetaData.KEY_STATE: self.DATATYPE_STATE}
        operation = model.Operation(self.user.id, self.project.id, self.algorithm.id, 'test parameters',
                                    meta=json.dumps(self.meta), status=model.STATUS_FINISHED,
                                    method_name=ABCAdapter.LAUNCH_METHOD)
        self.operation = dao.store_entity(operation)
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:31,代码来源:datatypes_factory.py


示例19: export_project

    def export_project(self, project):
        """
        Given a project root and the TVB storage_path, create a ZIP
        ready for export.
        :param project: project object which identifies project to be exported
        """
        if project is None:
            raise ExportException("Please provide project to be exported")

        files_helper = FilesHelper()
        project_folder = files_helper.get_project_folder(project)
        
        bursts_dict = {}
        datatype_burst_mapping = {}
        bursts_count = dao.get_bursts_for_project(project.id, count=True)
        for start_idx in range(0, bursts_count, BURST_PAGE_SIZE):
            bursts = dao.get_bursts_for_project(project.id, page_start=start_idx, page_end=start_idx + BURST_PAGE_SIZE)
            for burst in bursts:
                self._build_burst_export_dict(burst, bursts_dict)
                
        datatypes_count = dao.get_datatypes_for_project(project.id, count=True)
        for start_idx in range(0, datatypes_count, DATAYPES_PAGE_SIZE):
            datatypes = dao.get_datatypes_for_project(project.id, page_start=start_idx,
                                                      page_end=start_idx + DATAYPES_PAGE_SIZE)
            for datatype in datatypes:
                datatype_burst_mapping[datatype.gid] = datatype.fk_parent_burst

        # Compute path and name of the zip file
        now = datetime.now()
        date_str = now.strftime("%Y-%m-%d_%H-%M")
        zip_file_name = "%s_%s.%s" % (date_str, project.name, self.ZIP_FILE_EXTENSION)
        
        export_folder = self._build_data_export_folder(project)    
        result_path = os.path.join(export_folder, zip_file_name) 
        
        bursts_file_name = os.path.join(project_folder, BURST_INFO_FILE)
        burst_info = {BURSTS_DICT_KEY: bursts_dict,
                      DT_BURST_MAP: datatype_burst_mapping}
        with open(bursts_file_name, 'w') as bursts_file:
            bursts_file.write(json.dumps(burst_info))
            
        # pack project content into a ZIP file
        result_zip = files_helper.zip_folder(result_path, project_folder)
        
        # remove these files, since we only want them in export archive
        os.remove(bursts_file_name)
        return result_zip
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:47,代码来源:export_manager.py


示例20: setUp

 def setUp(self):
     """
     Reset the database before each test.
     """
     config.EVENTS_FOLDER = ''
     self.project_service = ProjectService()
     self.structure_helper = FilesHelper()
     self.test_user = TestFactory.create_user()
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:8,代码来源:project_service_test.py



注:本文中的tvb.core.entities.file.files_helper.FilesHelper类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python storage.SA_SESSIONMAKER类代码示例发布时间:2022-05-27
下一篇:
Python input_tree.InputTreeManager类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap