本文整理汇总了Python中spreadsheetimport.SpreadsheetImport类的典型用法代码示例。如果您正苦于以下问题:Python SpreadsheetImport类的具体用法?Python SpreadsheetImport怎么用?Python SpreadsheetImport使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SpreadsheetImport类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: testRecordMultiRowImport2
def testRecordMultiRowImport2(self):
no_signals()
identifier = Field.objects.get(name='identifier', standard__prefix='dc')
title = Field.objects.get(name='title', standard__prefix='dc')
testimport = SpreadsheetImport(StringIO("""Identifier,Title
,Title1
Z001,Title2
,Title3
Z003,Title7
Z002,Title4
Z002,Title5
Z002,Title6
Z003,Title8"""),
[self.collection])
testimport.name_field = 'Identifier'
testimport.run()
self.assertEqual(3, testimport.added)
self.assertEqual(0, testimport.updated)
self.assertEqual(1, testimport.duplicate_in_file_skipped)
self.assertEqual(1, testimport.no_id_skipped)
t1 = self.collection.records.get(name='z001').fieldvalue_set.filter(field=title)
t2 = self.collection.records.get(name='z002').fieldvalue_set.filter(field=title)
t3 = self.collection.records.get(name='z003').fieldvalue_set.filter(field=title)
self.assertEqual('Title2', t1[0].value)
self.assertEqual('Title3', t1[1].value)
self.assertEqual('Title4', t2[0].value)
self.assertEqual('Title5', t2[1].value)
self.assertEqual('Title6', t2[2].value)
self.assertEqual('Title7', t3[0].value)
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:35,代码来源:tests.py
示例2: testSkipAdds
def testSkipAdds(self):
no_signals()
identifier = Field.objects.get(name='identifier', standard__prefix='dc')
title = Field.objects.get(name='title', standard__prefix='dc')
r1 = Record.objects.create(name='r001')
CollectionItem.objects.create(record=r1, collection=self.collection)
r1.fieldvalue_set.create(field=identifier, value='R001')
r1.fieldvalue_set.create(field=title, value='Title')
testimport = SpreadsheetImport(StringIO("Identifier,Title\nR002,NewTitle1\nR001,NewTitle2"),
[self.collection])
testimport.name_field = 'Identifier'
testimport.run(add=False)
self.assertEqual(0, testimport.added)
self.assertEqual(1, testimport.updated)
self.assertEqual(1, testimport.added_skipped)
self.assertEqual(0, testimport.updated_skipped)
t1 = self.collection.records.get(name='r001').fieldvalue_set.filter(field=title)
t2 = self.collection.records.filter(name='r002')
self.assertEqual('NewTitle2', t1[0].value)
self.assertFalse(t2)
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:25,代码来源:tests.py
示例3: testKeepSystemFieldValues
def testKeepSystemFieldValues(self):
no_signals()
identifier = Field.objects.get(name='identifier', standard__prefix='dc')
title = Field.objects.get(name='title', standard__prefix='dc')
system = get_system_field()
r1 = Record.objects.create(name='s001')
CollectionItem.objects.create(record=r1, collection=self.collection)
r1.fieldvalue_set.create(field=identifier, value='S001')
r1.fieldvalue_set.create(field=title, value='Title')
r1.fieldvalue_set.create(field=system, value='Keep this')
testimport = SpreadsheetImport(StringIO("Identifier,Title\nS002,NewTitle2\nS001,NewTitle1"),
[self.collection])
testimport.name_field = 'Identifier'
testimport.run(update=True)
self.assertEqual(1, testimport.added)
self.assertEqual(1, testimport.updated)
self.assertEqual(0, testimport.added_skipped)
self.assertEqual(0, testimport.updated_skipped)
t1 = self.collection.records.get(name='s001').fieldvalue_set.filter(field=title)
t2 = self.collection.records.get(name='s002').fieldvalue_set.filter(field=title)
s = self.collection.records.get(name='s001').fieldvalue_set.filter(field=system)
self.assertEqual('NewTitle1', t1[0].value)
self.assertEqual('NewTitle2', t2[0].value)
self.assertEqual('Keep this', s[0].value)
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:29,代码来源:tests.py
示例4: testTestOnly
def testTestOnly(self):
no_signals()
identifier = Field.objects.get(name='identifier', standard__prefix='dc')
title = Field.objects.get(name='title', standard__prefix='dc')
r1 = Record.objects.create(name='t001')
CollectionItem.objects.create(record=r1, collection=self.collection)
r1.fieldvalue_set.create(field=identifier, value='T001')
r1.fieldvalue_set.create(field=title, value='Title')
testimport = SpreadsheetImport(StringIO("""Identifier,Title
,Title1
T001,Title2
,Title3
T003,Title7
T002,Title4
T002,Title5
T002,Title6
T003,Title8"""),
[self.collection])
testimport.name_field = 'Identifier'
testimport.run(test=True)
self.assertEqual(2, testimport.added)
self.assertEqual(1, testimport.updated)
self.assertEqual(1, testimport.duplicate_in_file_skipped)
self.assertEqual(1, testimport.no_id_skipped)
r = self.collection.records.filter(name__startswith='t')
self.assertEqual(1, r.count())
t1 = self.collection.records.get(name='t001').fieldvalue_set.filter(field=title)
self.assertEqual('Title', t1[0].value)
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:34,代码来源:tests.py
示例5: testOwnedRecordImport
def testOwnedRecordImport(self):
no_signals()
identifier = Field.objects.get(name='identifier', standard__prefix='dc')
title = Field.objects.get(name='title', standard__prefix='dc')
r1 = Record.objects.create(name='x001')
CollectionItem.objects.create(record=r1, collection=self.collection)
r1.fieldvalue_set.create(field=identifier, value='X001')
r1.fieldvalue_set.create(field=title, value='Standard')
r2 = Record.objects.create(name='x002', owner=self.user)
CollectionItem.objects.create(record=r2, collection=self.collection)
r2.fieldvalue_set.create(field=identifier, value='X002')
r2.fieldvalue_set.create(field=title, value='Owned')
testimport = SpreadsheetImport(StringIO("Identifier,Title\nX001,NewTitle1\nX002,NewTitle2\nX003,NewTitle3"),
[self.collection],
owner=self.user)
testimport.name_field = 'Identifier'
testimport.run()
self.assertEqual(1, testimport.added)
self.assertEqual(1, testimport.updated)
self.assertEqual(1, testimport.owner_skipped)
r1 = self.collection.records.get(name='x001')
r2 = self.collection.records.get(name='x002')
r3 = self.collection.records.get(name='x003')
self.assertEqual(None, r1.owner)
self.assertEqual(self.user, r2.owner)
self.assertEqual(self.user, r3.owner)
self.assertEqual('Standard', r1.title)
self.assertEqual('NewTitle2', r2.title)
self.assertEqual('NewTitle3', r3.title)
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:34,代码来源:tests.py
示例6: testImport
def testImport(self):
testimport = SpreadsheetImport(StringIO(csv_file), [self.collection])
self.assertFalse(testimport.analyzed)
data = testimport.analyze()
self.assertTrue(testimport.analyzed)
开发者ID:historyadmin,项目名称:rooibos,代码行数:8,代码来源:tests.py
示例7: analyze
def analyze(collections=None, separator=None, separate_fields=None, fieldset=None):
try:
with open(os.path.join(_get_scratch_dir(), _get_filename(request, file)), 'rU') as csvfile:
imp = SpreadsheetImport(csvfile, collections, separator=separator,
separate_fields=separate_fields, preferred_fieldset=fieldset)
return imp, imp.analyze()
except IOError:
raise Http404()
开发者ID:knabar,项目名称:fynbos,代码行数:8,代码来源:views.py
示例8: testNoBOM
def testNoBOM(self):
"""Make sure the import can handle the lack of BOM at the beginning of some UTF-8 files"""
no_signals()
testimport = SpreadsheetImport(StringIO(csv_file), [self.collection])
data = testimport.analyze()
self.assertTrue(testimport.mapping.has_key('ID'))
self.assertTrue(testimport.mapping.has_key('Filename'))
self.assertTrue(testimport.mapping.has_key('Unused'))
self.assertTrue(testimport.mapping.has_key('Title'))
self.assertTrue(testimport.mapping.has_key('Creator'))
self.assertTrue(testimport.mapping.has_key('Location'))
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:12,代码来源:tests.py
示例9: testImportSimple
def testImportSimple(self):
no_signals()
testimport = SpreadsheetImport(StringIO(csv_file), [self.collection])
self.assertEqual(0, self.collection.records.count())
testimport.analyze()
dc = dict(
identifier=Field.objects.get(name='identifier', standard__prefix='dc'),
title=Field.objects.get(name='title', standard__prefix='dc'),
creator=Field.objects.get(name='creator', standard__prefix='dc'),
coverage=Field.objects.get(name='coverage', standard__prefix='dc'),
)
testimport.mapping = {
'ID': dc['identifier'],
'Filename': None,
'Title': dc['title'],
'Creator': dc['creator'],
'Location': dc['coverage'],
}
testimport.name_field = 'ID'
self.assertNotEqual(None, testimport.get_identifier_field())
testimport.run()
self.assertEquals(2, self.collection.records.count())
r1 = self.collection.records.get(name='A001'.lower())
self.assertEqual('A001', r1.fieldvalue_set.get(field=dc['identifier']).value)
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:30,代码来源:tests.py
示例10: test_split_values_import
def test_split_values_import(self):
testimport = SpreadsheetImport(
StringIO("ID,Split,NoSplit\nA999,a;b,a;b"), [self.collection])
testimport.analyze()
dc = dict(
identifier=Field.objects.get(
name='identifier', standard__prefix='dc'),
title=Field.objects.get(name='title', standard__prefix='dc'),
creator=Field.objects.get(name='creator', standard__prefix='dc'),
)
testimport.mapping = {
'ID': dc['identifier'],
'Split': dc['title'],
'NoSplit': dc['creator'],
}
testimport.name_field = 'ID'
testimport.separate_fields = {
'Split': True,
}
testimport.run()
r = self.collection.records.get(name='A999'.lower())
self.assertEqual(
'a',
r.fieldvalue_set.filter(field=testimport.mapping['Split'])[0].value
)
self.assertEqual(
'b',
r.fieldvalue_set.filter(field=testimport.mapping['Split'])[1].value
)
self.assertEqual(
'a;b',
r.fieldvalue_set.filter(
field=testimport.mapping['NoSplit'])[0].value
)
开发者ID:hanleybrand,项目名称:rooibos,代码行数:34,代码来源:tests.py
示例11: testBOM
def testBOM(self):
"""Make sure the import can handle the BOM at the beginning of some UTF-8 files"""
BOM = "\xef\xbb\xbf"
testimport = SpreadsheetImport(StringIO(BOM + csv_file), [self.collection])
data = testimport.analyze()
self.assertTrue(testimport.mapping.has_key('ID'))
self.assertTrue(testimport.mapping.has_key('Filename'))
self.assertTrue(testimport.mapping.has_key('Unused'))
self.assertTrue(testimport.mapping.has_key('Title'))
self.assertTrue(testimport.mapping.has_key('Creator'))
self.assertTrue(testimport.mapping.has_key('Location'))
开发者ID:historyadmin,项目名称:rooibos,代码行数:14,代码来源:tests.py
示例12: test_no_bom
def test_no_bom(self):
"""Make sure the import can handle the lack of BOM at the beginning
of some UTF-8 files"""
testimport = SpreadsheetImport(StringIO(csv_file), [self.collection])
testimport.analyze()
self.assertTrue('ID' in testimport.mapping)
self.assertTrue('Filename' in testimport.mapping)
self.assertTrue('Unused' in testimport.mapping)
self.assertTrue('Title' in testimport.mapping)
self.assertTrue('Creator' in testimport.mapping)
self.assertTrue('Location' in testimport.mapping)
开发者ID:hanleybrand,项目名称:rooibos,代码行数:14,代码来源:tests.py
示例13: testRecordMultiRowImport
def testRecordMultiRowImport(self):
identifier = Field.objects.get(name='identifier', standard__prefix='dc')
title = Field.objects.get(name='title', standard__prefix='dc')
testimport = SpreadsheetImport(StringIO("Identifier,Title\nY001,Title1\n,Title2"),
[self.collection])
testimport.name_field = 'Identifier'
testimport.run()
self.assertEqual(1, testimport.added)
self.assertEqual(0, testimport.no_id_skipped)
r1 = self.collection.records.get(name='y001')
titles = r1.fieldvalue_set.filter(field=title)
self.assertEqual('Title1', titles[0].value)
self.assertEqual('Title2', titles[1].value)
开发者ID:historyadmin,项目名称:rooibos,代码行数:17,代码来源:tests.py
示例14: test_analyze
def test_analyze(self):
testimport = SpreadsheetImport(StringIO(csv_file), [self.collection])
self.assertFalse(testimport.analyzed)
data = testimport.analyze()
self.assertTrue(testimport.analyzed)
self.assertEqual(2, len(data))
self.assertEqual('A001', data[0]['ID'][0])
self.assertEqual('a001.jpg', data[0]['Filename'][0])
self.assertEqual('Test', data[0]['Title'][0])
self.assertEqual('Knab, Andreas', data[0]['Creator'][0])
self.assertEqual('Harrisonburg, VA', data[0]['Location'][0])
self.assertEqual(None, data[0]['Unused'])
self.assertEqual('A002', data[1]['ID'][0])
self.assertEqual('a002.jpg', data[1]['Filename'][0])
self.assertEqual('Another Test', data[1]['Title'][0])
self.assertEqual('Andreas Knab;John Doe', data[1]['Creator'][0])
self.assertEqual('Virginia', data[1]['Location'][0])
self.assertEqual(None, data[1]['Unused'])
# These don't match anything
self.assertEqual(None, testimport.mapping['ID'])
self.assertEqual(None, testimport.mapping['Filename'])
self.assertEqual(None, testimport.mapping['Unused'])
# These should match standards fields
self.assertNotEqual(None, testimport.mapping['Title'])
self.assertNotEqual(None, testimport.mapping['Creator'])
self.assertNotEqual(None, testimport.mapping['Location'])
self.assertEqual(None, testimport.get_identifier_field())
# Map the ID field and try again
testimport.mapping['ID'] = Field.objects.get(
name='identifier', standard__prefix='dc')
self.assertEqual('ID', testimport.get_identifier_field())
开发者ID:hanleybrand,项目名称:rooibos,代码行数:42,代码来源:tests.py
示例15: testUpdateNames
def testUpdateNames(self):
identifier = Field.objects.get(name='identifier', standard__prefix='dc')
title = Field.objects.get(name='title', standard__prefix='dc')
r1 = Record.objects.create(name='old-title')
CollectionItem.objects.create(record=r1, collection=self.collection)
r1.fieldvalue_set.create(field=identifier, value='D001')
r1.fieldvalue_set.create(field=title, value='Old Title')
testimport = SpreadsheetImport(StringIO("Identifier,Title\nD001,New Title 1\nD001,New Title 2"),
[self.collection])
testimport.name_field = 'Title'
testimport.run(update_names=True)
self.assertEqual(0, testimport.added)
self.assertEqual(1, testimport.updated)
self.assertEqual(0, testimport.duplicate_in_file_skipped)
r1 = Record.objects.get(id=r1.id)
self.assertEqual('new-title-1', r1.name)
开发者ID:historyadmin,项目名称:rooibos,代码行数:20,代码来源:tests.py
示例16: testFindDuplicateIdentifiers
def testFindDuplicateIdentifiers(self):
testimport = SpreadsheetImport(StringIO(), [self.collection])
dup = testimport.find_duplicate_identifiers()
self.assertEqual(0, len(dup))
dcidentifier = Field.objects.get(name='identifier', standard__prefix='dc')
def create_record(id):
record = Record.objects.create()
self.records.append(record)
record.fieldvalue_set.create(field=dcidentifier, value=id)
CollectionItem.objects.create(record=record, collection=self.collection)
create_record('X001')
create_record('X002')
create_record('X002')
dup = testimport.find_duplicate_identifiers()
self.assertEqual(1, len(dup))
self.assertEqual('X002', dup[0])
开发者ID:historyadmin,项目名称:rooibos,代码行数:21,代码来源:tests.py
示例17: csvimport
def csvimport(job):
log.debug('csvimport started for %s' % job)
jobinfo = JobInfo.objects.get(id=job.arg)
try:
arg = json.loads(jobinfo.arg)
if jobinfo.status.startswith == 'Complete':
# job finished previously
log.debug('csvimport finished previously for %s' % job)
return
file = os.path.join(_get_scratch_dir(), arg['file'])
if not os.path.exists(file):
# import file missing
log.exception('Import file %s missing', file)
jobinfo.complete('Import file missing', 'Import failed')
resultfile = file + '.result'
if os.path.exists(resultfile):
# import must have died in progress
with open(resultfile, 'r') as f:
results = csv.DictReader(f)
count = -1
for count, row in enumerate(results):
pass
skip_rows = count + 1
else:
skip_rows = 0
infile = open(file, 'rU')
outfile = open(resultfile, 'a', 0)
outwriter = csv.writer(outfile)
if not skip_rows:
outwriter.writerow(['Identifier', 'Action'])
class Counter(object):
def __init__(self):
self.counter = 0
def create_handler(event, counter_obj):
# TODO: pycharm is raising 'unresolved reference' errors for counter -
# it's dumb, but I guess it what ws happening confused me for a second
# so I refactored the funcion arg to be counter_obj, to make clear that
# a Counter is being passed
def handler(id):
counter_obj.counter += 1
jobinfo.update_status('processing row %s' % counter_obj.counter)
outwriter.writerow([';'.join(id) if id else '', event])
log.debug('create_handler: %s' % handler)
return handler
counter = Counter()
handlers = dict(
(e, create_handler(e, counter)) for e in SpreadsheetImport.events)
fieldset = FieldSet.objects.filter(
id=arg['fieldset']) if arg['fieldset'] else None
collections = Collection.objects.filter(id__in=arg['collections'])
imp = SpreadsheetImport(
infile,
collections,
separator=arg['separator'],
owner=jobinfo.owner if arg['personal'] else None,
preferred_fieldset=fieldset[0] if fieldset else None,
mapping=arg['mapping'],
separate_fields=arg['separate_fields'],
labels=arg['labels'],
order=arg['order'],
hidden=arg['hidden'],
**handlers
)
log.debug('csvimport calling run() for %s' % job)
log.debug('skip_rows = %s' % skip_rows)
imp.run(arg['update'],
arg['add'],
arg['test'],
collections,
skip_rows=skip_rows)
log.info('csvimport complete: %s' % job)
jobinfo.complete('Complete', '%s rows processed' % counter.counter)
except Exception as ex:
log.exception('csvimport failed: %s' % job)
log.exception(traceback.format_exc())
jobinfo.complete('Failed: %s\n%s' % (ex, traceback.format_exc()), None)
开发者ID:eResearchSandpit,项目名称:rooibos,代码行数:95,代码来源:workers.py
示例18: csvimport
def csvimport(job):
logging.debug('csvimport started for %s' % job)
jobinfo = JobInfo.objects.get(id=job.arg)
try:
arg = simplejson.loads(jobinfo.arg)
if jobinfo.status.startswith == 'Complete':
# job finished previously
logging.debug('csvimport finished previously for %s' % job)
return
file = os.path.join(_get_scratch_dir(), arg['file'])
if not os.path.exists(file):
# import file missing
jobinfo.complete('Import file missing', 'Import failed')
resultfile = file + '.result'
if os.path.exists(resultfile):
# import must have died in progress
with open(resultfile, 'r') as f:
results = csv.DictReader(f)
count = -1
for count, row in enumerate(results):
pass
skip_rows = count + 1
else:
skip_rows = 0
infile = open(file, 'rU')
outfile = open(resultfile, 'a', 0)
outwriter = csv.writer(outfile)
if not skip_rows:
outwriter.writerow(['Identifier', 'Action'])
class Counter(object):
def __init__(self):
self.counter = 0
def create_handler(event, counter):
def handler(id):
counter.counter += 1
jobinfo.update_status('processing row %s' % counter.counter)
outwriter.writerow(
[';'.join(id).encode('utf-8') if id else '', event])
return handler
counter = Counter()
handlers = dict(
(e, create_handler(e, counter)) for e in SpreadsheetImport.events)
fieldset = FieldSet.objects.filter(
id=arg['fieldset']) if arg['fieldset'] else None
collections = Collection.objects.filter(id__in=arg['collections'])
imp = SpreadsheetImport(
infile,
collections,
separator=arg['separator'],
owner=jobinfo.owner if arg['personal'] else None,
preferred_fieldset=fieldset[0] if fieldset else None,
mapping=arg['mapping'],
separate_fields=arg['separate_fields'],
labels=arg['labels'],
order=arg['order'],
hidden=arg['hidden'],
refinements=arg['refinements'],
**handlers
)
logging.debug('csvimport calling run() for %s' % job)
imp.run(arg['update'],
arg['add'],
arg['test'],
collections,
skip_rows=skip_rows)
logging.info('csvimport complete: %s' % job)
jobinfo.complete('Complete', '%s rows processed' % counter.counter)
except Exception, ex:
logging.exception('csvimport failed: %s' % job)
jobinfo.complete('Failed: %s' % ex, None)
开发者ID:hanleybrand,项目名称:rooibos,代码行数:91,代码来源:workers.py
注:本文中的spreadsheetimport.SpreadsheetImport类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论