本文整理汇总了Python中website.files.models.FileNode类的典型用法代码示例。如果您正苦于以下问题:Python FileNode类的具体用法?Python FileNode怎么用?Python FileNode使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了FileNode类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: update_file_guid_referent
def update_file_guid_referent(self, node, event_type, payload, user=None):
if event_type == 'addon_file_moved' or event_type == 'addon_file_renamed':
source = payload['source']
destination = payload['destination']
source_node = Node.load(source['node']['_id'])
destination_node = node
file_guids = FileNode.resolve_class(source['provider'], FileNode.ANY).get_file_guids(
materialized_path=source['materialized'] if source['provider'] != 'osfstorage' else source['path'],
provider=source['provider'],
node=source_node)
if event_type == 'addon_file_renamed' and source['provider'] in settings.ADDONS_BASED_ON_IDS:
return
if event_type == 'addon_file_moved' and (source['provider'] == destination['provider'] and
source['provider'] in settings.ADDONS_BASED_ON_IDS) and source_node == destination_node:
return
for guid in file_guids:
obj = Guid.load(guid)
if source_node != destination_node and Comment.find(Q('root_target', 'eq', guid)).count() != 0:
update_comment_node(guid, source_node, destination_node)
if source['provider'] != destination['provider'] or source['provider'] != 'osfstorage':
old_file = FileNode.load(obj.referent._id)
obj.referent = create_new_file(obj, source, destination, destination_node)
obj.save()
if old_file and not TrashedFileNode.load(old_file._id):
old_file.delete()
开发者ID:caspinelli,项目名称:osf.io,代码行数:28,代码来源:comment.py
示例2: addon_delete_file_node
def addon_delete_file_node(self, node, user, event_type, payload):
""" Get addon StoredFileNode(s), move it into the TrashedFileNode collection
and remove it from StoredFileNode.
Required so that the guids of deleted addon files are not re-pointed when an
addon file or folder is moved or renamed.
"""
if event_type == 'file_removed' and payload.get('provider', None) != 'osfstorage':
provider = payload['provider']
path = payload['metadata']['path']
materialized_path = payload['metadata']['materialized']
if path.endswith('/'):
folder_children = FileNode.resolve_class(provider, FileNode.ANY).find(
Q('provider', 'eq', provider) &
Q('node', 'eq', node) &
Q('materialized_path', 'startswith', materialized_path)
)
for item in folder_children:
if item.kind == 'file' and not TrashedFileNode.load(item._id):
item.delete(user=user)
elif item.kind == 'folder':
StoredFileNode.remove_one(item.stored_object)
else:
try:
file_node = FileNode.resolve_class(provider, FileNode.FILE).find_one(
Q('node', 'eq', node) &
Q('materialized_path', 'eq', materialized_path)
)
except NoResultsFound:
file_node = None
if file_node and not TrashedFileNode.load(file_node._id):
file_node.delete(user=user)
开发者ID:baylee-d,项目名称:osf.io,代码行数:33,代码来源:views.py
示例3: test_comments_move_when_file_moved_to_osfstorage
def test_comments_move_when_file_moved_to_osfstorage(self):
osfstorage = self.project.get_addon('osfstorage')
root_node = osfstorage.get_root()
osf_file = root_node.append_file('file.txt')
osf_file.create_version(self.user, {
'object': '06d80e',
'service': 'cloud',
osfstorage_settings.WATERBUTLER_RESOURCE: 'osf',
}, {
'size': 1337,
'contentType': 'img/png',
'etag': 'abcdefghijklmnop'
}).save()
source = {
'path': '/file.txt',
'node': self.project,
'provider': self.provider
}
destination = {
'path': osf_file.path,
'node': self.project,
'provider': 'osfstorage'
}
self._create_file_with_comment(node=source['node'], path=source['path'])
payload = self._create_payload('move', self.user, source, destination, self.file._id, destination_file_id=destination['path'].strip('/'))
update_file_guid_referent(self=None, node=destination['node'], event_type='addon_file_moved', payload=payload)
self.guid.reload()
file_node = FileNode.resolve_class('osfstorage', FileNode.FILE).get_or_create(destination['node'], destination['path'])
assert_equal(self.guid._id, file_node.get_guid()._id)
file_comments = Comment.find(Q('root_target', 'eq', self.guid._id))
assert_equal(file_comments.count(), 1)
开发者ID:545zhou,项目名称:osf.io,代码行数:33,代码来源:test_comments.py
示例4: test_comments_move_when_folder_moved_to_different_provider
def test_comments_move_when_folder_moved_to_different_provider(self, destination_provider, destination_path):
if self.provider == destination_provider:
return True
self.project.add_addon(destination_provider, auth=Auth(self.user))
self.project.save()
self.addon_settings = self.project.get_addon(destination_provider)
self.addon_settings.folder = '/AddonFolder'
self.addon_settings.save()
source = {
'path': '/subfolder/',
'node': self.project,
'provider': self.provider
}
destination = {
'path': '/subfolder/',
'node': self.project,
'provider': destination_provider,
'children': [{
'path': '/subfolder/file.txt',
'node': self.project,
'provider': destination_provider
}]
}
file_name = 'file.txt'
self._create_file_with_comment(node=source['node'], path='{}{}'.format(source['path'], file_name))
payload = self._create_payload('move', self.user, source, destination, self.file._id)
update_file_guid_referent(self=None, node=destination['node'], event_type='addon_file_moved', payload=payload)
self.guid.reload()
file_node = FileNode.resolve_class(destination_provider, FileNode.FILE).get_or_create(destination['node'], destination_path)
assert_equal(self.guid._id, file_node.get_guid()._id)
file_comments = Comment.find(Q('root_target', 'eq', self.guid._id))
assert_equal(file_comments.count(), 1)
开发者ID:545zhou,项目名称:osf.io,代码行数:35,代码来源:test_comments.py
示例5: test_comments_move_when_folder_moved_to_different_provider
def test_comments_move_when_folder_moved_to_different_provider(self, destination_provider, destination_path):
if self.provider == destination_provider:
return True
self.project.add_addon(destination_provider, auth=Auth(self.user))
self.project.save()
self.addon_settings = self.project.get_addon(destination_provider)
self.addon_settings.folder = "/AddonFolder"
self.addon_settings.save()
source = {"path": "/subfolder/", "node": self.project, "provider": self.provider}
destination = {
"path": "/subfolder/",
"node": self.project,
"provider": destination_provider,
"children": [{"path": "/subfolder/file.txt", "node": self.project, "provider": destination_provider}],
}
file_name = "file.txt"
self._create_file_with_comment(node=source["node"], path="{}{}".format(source["path"], file_name))
payload = self._create_payload("move", self.user, source, destination, self.file._id)
update_file_guid_referent(self=None, node=destination["node"], event_type="addon_file_moved", payload=payload)
self.guid.reload()
file_node = FileNode.resolve_class(destination_provider, FileNode.FILE).get_or_create(
destination["node"], destination_path
)
assert_equal(self.guid._id, file_node.get_guid()._id)
file_comments = Comment.find(Q("root_target", "eq", self.guid._id))
assert_equal(file_comments.count(), 1)
开发者ID:ycchen1989,项目名称:osf.io,代码行数:29,代码来源:test_comments.py
示例6: test_comments_move_when_folder_moved_to_osfstorage
def test_comments_move_when_folder_moved_to_osfstorage(self):
osfstorage = self.project.get_addon("osfstorage")
root_node = osfstorage.get_root()
osf_folder = root_node.append_folder("subfolder")
osf_file = osf_folder.append_file("file.txt")
osf_file.create_version(
self.user,
{"object": "06d80e", "service": "cloud", osfstorage_settings.WATERBUTLER_RESOURCE: "osf"},
{"size": 1337, "contentType": "img/png", "etag": "1234567890abcde"},
).save()
source = {"path": "/subfolder/", "node": self.project, "provider": self.provider}
destination = {
"path": "/subfolder/",
"node": self.project,
"provider": "osfstorage",
"children": [{"path": "/subfolder/file.txt", "node": self.project, "provider": "osfstorage"}],
}
file_name = "file.txt"
self._create_file_with_comment(node=source["node"], path="{}{}".format(source["path"], file_name))
payload = self._create_payload(
"move", self.user, source, destination, self.file._id, destination_file_id=osf_file._id
)
update_file_guid_referent(self=None, node=destination["node"], event_type="addon_file_moved", payload=payload)
self.guid.reload()
file_node = FileNode.resolve_class("osfstorage", FileNode.FILE).get_or_create(destination["node"], osf_file._id)
assert_equal(self.guid._id, file_node.get_guid()._id)
file_comments = Comment.find(Q("root_target", "eq", self.guid._id))
assert_equal(file_comments.count(), 1)
开发者ID:ycchen1989,项目名称:osf.io,代码行数:30,代码来源:test_comments.py
示例7: create_new_file
def create_new_file(obj, source, destination, destination_node):
if not source['path'].endswith('/'):
data = dict(destination)
new_file = FileNode.resolve_class(destination['provider'], FileNode.FILE).get_or_create(destination_node, destination['path'])
if destination['provider'] != 'osfstorage':
new_file.update(revision=None, data=data)
else:
new_file = find_and_create_file_from_metadata(destination.get('children', []), source, destination, destination_node, obj)
if not new_file:
if source['provider'] == 'box':
new_path = obj.referent.path
else:
new_path = obj.referent.materialized_path.replace(source['materialized'], destination['materialized'])
new_file = FileNode.resolve_class(destination['provider'], FileNode.FILE).get_or_create(destination_node, new_path)
new_file.name = new_path.split('/')[-1]
new_file.materialized_path = new_path
new_file.save()
return new_file
开发者ID:digideskio,项目名称:osf.io,代码行数:18,代码来源:comment.py
示例8: update_comment_root_target_file
def update_comment_root_target_file(self, node, event_type, payload, user=None):
if event_type == 'addon_file_moved':
source = payload['source']
destination = payload['destination']
source_node = Node.load(source['node']['_id'])
destination_node = node
if (source.get('provider') == destination.get('provider') == 'osfstorage') and source_node._id != destination_node._id:
old_file = FileNode.load(source.get('path').strip('/'))
new_file = FileNode.resolve_class(destination.get('provider'), FileNode.FILE).get_or_create(destination_node, destination.get('path'))
Comment.update(Q('root_target', 'eq', old_file._id), data={'node': destination_node})
# update node record of commented files
if old_file._id in source_node.commented_files:
destination_node.commented_files[new_file._id] = source_node.commented_files[old_file._id]
del source_node.commented_files[old_file._id]
source_node.save()
destination_node.save()
开发者ID:mchelen,项目名称:osf.io,代码行数:19,代码来源:comment.py
示例9: update_comment_root_target_file
def update_comment_root_target_file(self, node, event_type, payload, user=None):
if event_type == 'addon_file_moved':
source = payload['source']
destination = payload['destination']
source_node = Node.load(source['node']['_id'])
destination_node = node
if (source.get('provider') == destination.get('provider') == 'osfstorage') and source_node._id != destination_node._id:
obj = FileNode.load(source.get('path').strip('/'))
update_folder_contents([obj], source_node, destination_node)
开发者ID:AllisonLBowers,项目名称:osf.io,代码行数:10,代码来源:comment.py
示例10: get_file_item
def get_file_item(self, item):
file_node = FileNode.resolve_class(
item['provider'],
FileNode.FOLDER if item['kind'] == 'folder'
else FileNode.FILE
).get_or_create(self.get_node(), item['path'])
file_node.update(None, item, user=self.request.user)
return file_node
开发者ID:luismulinari,项目名称:osf.io,代码行数:10,代码来源:views.py
示例11: get_file_item
def get_file_item(self, item):
file_node = FileNode.resolve_class(
item['provider'],
FileNode.FOLDER if item['kind'] == 'folder'
else FileNode.FILE
).get_or_create(self.get_node(check_object_permissions=False), item['path'])
file_node.update(None, item, user=self.request.user)
self.check_object_permissions(self.request, file_node)
return file_node
开发者ID:hmoco,项目名称:osf.io,代码行数:12,代码来源:views.py
示例12: find_and_create_file_from_metadata
def find_and_create_file_from_metadata(children, source, destination, destination_node, obj):
""" Given a Guid obj, recursively search for the metadata of its referent (a file obj)
in the waterbutler response. If found, create a new addon FileNode with that metadata
and return the new file.
"""
for item in children:
if item['kind'] == 'folder':
return find_and_create_file_from_metadata(item.get('children', []), source, destination, destination_node, obj)
elif item['kind'] == 'file' and item['materialized'].replace(destination['materialized'], source['materialized']) == obj.referent.materialized_path:
data = dict(item)
new_file = FileNode.resolve_class(destination['provider'], FileNode.FILE).get_or_create(destination_node, item['path'])
if destination['provider'] != 'osfstorage':
new_file.update(revision=None, data=data)
return new_file
开发者ID:digideskio,项目名称:osf.io,代码行数:14,代码来源:comment.py
示例13: test_comments_move_when_file_moved_from_subfolder_to_root
def test_comments_move_when_file_moved_from_subfolder_to_root(self):
source = {"path": "/subfolder/file.txt", "node": self.project, "provider": self.provider}
destination = {"path": "/file.txt", "node": self.project, "provider": self.provider}
self._create_file_with_comment(node=source["node"], path=source["path"])
payload = self._create_payload("move", self.user, source, destination, self.file._id)
update_file_guid_referent(self=None, node=destination["node"], event_type="addon_file_moved", payload=payload)
self.guid.reload()
file_node = FileNode.resolve_class(self.provider, FileNode.FILE).get_or_create(
destination["node"], self._format_path(destination["path"], file_id=self.file._id)
)
assert_equal(self.guid._id, file_node.get_guid()._id)
file_comments = Comment.find(Q("root_target", "eq", self.guid._id))
assert_equal(file_comments.count(), 1)
开发者ID:ycchen1989,项目名称:osf.io,代码行数:14,代码来源:test_comments.py
示例14: migrate_file_representation
def migrate_file_representation(bad_file):
logger.info('Migrating file representation of File: {0}'.format(bad_file))
view_url = bad_file['viewUrl'].rstrip('/')
fid = view_url.split('/')[-1]
fnode = FileNode.load(fid)
if fnode is None:
fnode = TrashedFileNode.load(fid)
assert fnode is not None, 'Could not load FileNode or TrashedFileNode with id {}'.format(fid)
data = {
'data': {
'kind': 'file',
'name': bad_file['selectedFileName'],
'path': fnode.path,
'extra': {},
'sha256': fnode.versions[-1].metadata['sha256']
}
}
bad_file.update(data)
开发者ID:kch8qx,项目名称:osf.io,代码行数:18,代码来源:migrate_registration_extra_drafts.py
示例15: test_comments_move_when_file_moved_from_subfolder_to_root
def test_comments_move_when_file_moved_from_subfolder_to_root(self):
source = {
'path': '/subfolder/file.txt',
'node': self.project,
'provider': self.provider
}
destination = {
'path': '/file.txt',
'node': self.project,
'provider': self.provider
}
self._create_file_with_comment(node=source['node'], path=source['path'])
payload = self._create_payload('move', self.user, source, destination, self.file._id)
update_file_guid_referent(self=None, node=destination['node'], event_type='addon_file_moved', payload=payload)
self.guid.reload()
file_node = FileNode.resolve_class(self.provider, FileNode.FILE).get_or_create(destination['node'], self._format_path(destination['path'], file_id=self.file._id))
assert_equal(self.guid._id, file_node.get_guid()._id)
file_comments = Comment.find(Q('root_target', 'eq', self.guid._id))
assert_equal(file_comments.count(), 1)
开发者ID:545zhou,项目名称:osf.io,代码行数:20,代码来源:test_comments.py
示例16: view_file
def view_file(request, node_id, provider, file_id):
file = FileNode.load(file_id)
wb_url = file.generate_waterbutler_url()
return redirect(wb_url)
开发者ID:DanielSBrown,项目名称:osf.io,代码行数:4,代码来源:views.py
示例17: addon_view_or_download_file
def addon_view_or_download_file(auth, path, provider, **kwargs):
extras = request.args.to_dict()
extras.pop('_', None) # Clean up our url params a bit
action = extras.get('action', 'view')
node = kwargs.get('node') or kwargs['project']
node_addon = node.get_addon(provider)
if not path:
raise HTTPError(httplib.BAD_REQUEST)
if not isinstance(node_addon, StorageAddonBase):
raise HTTPError(httplib.BAD_REQUEST, {
'message_short': 'Bad Request',
'message_long': 'The add-on containing this file is no longer connected to the {}.'.format(node.project_or_component)
})
if not node_addon.has_auth:
raise HTTPError(httplib.UNAUTHORIZED, {
'message_short': 'Unauthorized',
'message_long': 'The add-on containing this file is no longer authorized.'
})
if not node_addon.complete:
raise HTTPError(httplib.BAD_REQUEST, {
'message_short': 'Bad Request',
'message_long': 'The add-on containing this file is no longer configured.'
})
file_node = FileNode.resolve_class(provider, FileNode.FILE).get_or_create(node, path)
# Note: Cookie is provided for authentication to waterbutler
# it is overriden to force authentication as the current user
# the auth header is also pass to support basic auth
version = file_node.touch(
request.headers.get('Authorization'),
**dict(
extras,
cookie=request.cookies.get(settings.COOKIE_NAME)
)
)
if version is None:
if file_node.get_guid():
# If this file has been successfully view before but no longer exists
# Show a nice error message
return addon_deleted_file(file_node=file_node, **kwargs)
raise HTTPError(httplib.NOT_FOUND, {
'message_short': 'Not Found',
'message_long': 'This file does not exist'
})
# TODO clean up these urls and unify what is used as a version identifier
if request.method == 'HEAD':
return make_response(('', 200, {
'Location': file_node.generate_waterbutler_url(**dict(extras, direct=None, version=version.identifier))
}))
if action == 'download':
return redirect(file_node.generate_waterbutler_url(**dict(extras, direct=None, version=version.identifier)))
if len(request.path.strip('/').split('/')) > 1:
guid = file_node.get_guid(create=True)
return redirect(furl.furl('/{}/'.format(guid._id)).set(args=extras).url)
return addon_view_file(auth, node, file_node, version)
开发者ID:kms6bn,项目名称:osf.io,代码行数:67,代码来源:views.py
示例18: addon_view_or_download_file
def addon_view_or_download_file(auth, path, provider, **kwargs):
extras = request.args.to_dict()
extras.pop('_', None) # Clean up our url params a bit
action = extras.get('action', 'view')
node = kwargs.get('node') or kwargs['project']
node_addon = node.get_addon(provider)
provider_safe = markupsafe.escape(provider)
path_safe = markupsafe.escape(path)
project_safe = markupsafe.escape(node.project_or_component)
if not path:
raise HTTPError(httplib.BAD_REQUEST)
if not isinstance(node_addon, StorageAddonBase):
raise HTTPError(httplib.BAD_REQUEST, data={
'message_short': 'Bad Request',
'message_long': 'The {} add-on containing {} is no longer connected to {}.'.format(provider_safe, path_safe, project_safe)
})
if not node_addon.has_auth:
raise HTTPError(httplib.UNAUTHORIZED, data={
'message_short': 'Unauthorized',
'message_long': 'The {} add-on containing {} is no longer authorized.'.format(provider_safe, path_safe)
})
if not node_addon.complete:
raise HTTPError(httplib.BAD_REQUEST, data={
'message_short': 'Bad Request',
'message_long': 'The {} add-on containing {} is no longer configured.'.format(provider_safe, path_safe)
})
file_node = FileNode.resolve_class(provider, FileNode.FILE).get_or_create(node, path)
# Note: Cookie is provided for authentication to waterbutler
# it is overriden to force authentication as the current user
# the auth header is also pass to support basic auth
version = file_node.touch(
request.headers.get('Authorization'),
**dict(
extras,
cookie=request.cookies.get(settings.COOKIE_NAME)
)
)
if version is None:
return addon_deleted_file(file_node=file_node, path=path, **kwargs)
# TODO clean up these urls and unify what is used as a version identifier
if request.method == 'HEAD':
return make_response(('', 200, {
'Location': file_node.generate_waterbutler_url(**dict(extras, direct=None, version=version.identifier))
}))
if action == 'download':
return redirect(file_node.generate_waterbutler_url(**dict(extras, direct=None, version=version.identifier)))
if action == 'get_guid':
draft_id = extras.get('draft')
draft = DraftRegistration.load(draft_id)
if draft is None or draft.is_approved:
raise HTTPError(httplib.BAD_REQUEST, data={
'message_short': 'Bad Request',
'message_long': 'File not associated with required object.'
})
guid = file_node.get_guid(create=True)
guid.referent.save()
return dict(guid=guid._id)
if len(request.path.strip('/').split('/')) > 1:
guid = file_node.get_guid(create=True)
return redirect(furl.furl('/{}/'.format(guid._id)).set(args=extras).url)
return addon_view_file(auth, node, file_node, version)
开发者ID:baylee-d,项目名称:osf.io,代码行数:74,代码来源:views.py
示例19: get_metadata_files
def get_metadata_files(draft):
data = draft.registration_metadata
for q, question in get_file_questions('prereg-prize.json'):
if not isinstance(data[q]['value'], dict):
for i, file_info in enumerate(data[q]['extra']):
provider = file_info['data']['provider']
if provider != 'osfstorage':
raise Http404(
'File does not exist in OSFStorage ({}: {})'.format(
q, question
))
file_guid = file_info.get('fileId')
if file_guid is None:
node = Node.load(file_info.get('nodeId'))
path = file_info['data'].get('path')
item = FileNode.resolve_class(
provider,
FileNode.FILE
).get_or_create(node, path)
file_guid = item.get_guid(create=True)._id
data[q]['extra'][i]['fileId'] = file_guid
draft.update_metadata(data)
draft.save()
else:
guid = Guid.load(file_guid)
item = guid.referent
if item is None:
raise Http404(
'File with guid "{}" in "{}" does not exist'.format(
file_guid, question
))
yield item
continue
for i, file_info in enumerate(data[q]['value']['uploader']['extra']):
provider = file_info['data']['provider']
if provider != 'osfstorage':
raise Http404(
'File does not exist in OSFStorage ({}: {})'.format(
q, question
))
file_guid = file_info.get('fileId')
if file_guid is None:
node = Node.load(file_info.get('nodeId'))
path = file_info['data'].get('path')
item = FileNode.resolve_class(
provider,
FileNode.FILE
).get_or_create(node, path)
file_guid = item.get_guid(create=True)._id
data[q]['value']['uploader']['extra'][i]['fileId'] = file_guid
draft.update_metadata(data)
draft.save()
else:
guid = Guid.load(file_guid)
item = guid.referent
if item is None:
raise Http404(
'File with guid "{}" in "{}" does not exist'.format(
file_guid, question
))
yield item
开发者ID:brianjgeiger,项目名称:osf.io,代码行数:61,代码来源:views.py
注:本文中的website.files.models.FileNode类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论