Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
504 views
in Technique[技术] by (71.8m points)

python-3.x - ValueError:不能腌制包含指针的ctypes对象(ValueError: ctypes objects containing pointers cannot be pickled)

I am new to Python (2 month of programing/learining exp in total).

(我是Python新手(总共2个月的编程/学习Exp经验)。)

In this code all i do is get some data from MSSQL database and transfer it to DynamDB.

(在此代码中,我要做的就是从MSSQL数据库中获取一些数据并将其传输到DynamDB。)

I just want to know why i getting this error: ValueError: ctypes objects containing pointers cannot be pickled Its happaning it this line : p.map(self.import_sample_data_dynamo, list_to_batch).

(我只想知道为什么会收到此错误:ValueError:不能对包含指针的ctypes对象进行腌制它使这一行发酸:p.map(self.import_sample_data_dynamo,list_to_batch)。)

import_sample_data_dynamo: its a function for Dynamo batch.

(import_sample_data_dynamo:它是Dynamo批处理的功能。)

list_to_batch: its a list of dictionaries.

(list_to_batch:其字典列表。)

can some one plz tell me what I am doint wrong.

(有人可以告诉我我错了吗。)

class GetSensorsSamplesSetToDynamoDBTable:

    def __init__(self):
        self.client = None
        self.db = None

        # MSSQL
        self.connection = None
        self.cursor: Cursor = None

    def init(self):
        # MSSQL
        connect_string = 'Driver={SQL Server};' 
                         'Server=xxxx;' 'Database=xxx;' 
                         'uid=xxx;pwd=xxx'
        self.connection = pypyodbc.connect(connect_string)
        self.cursor = self.connection.cursor()

        # DynamoDB
        dynamodb = boto3.resource('dynamodb')
        self.table = dynamodb.Table('xxx')

    def cleanup(self):
        # MSSQL
        self.cursor.close()
        self.connection.close()

    def do_work(self):
        self.init()
        data = []
        samples = self.get_files_received_by_ftp_prod2_data()
        for sample in samples:
            sample_id = sample['id']
            project_id = sample['projectid']
            sensor_id = sample['sensorid']
            sample_time = sample['sampletime']
            row = {"_id": sample_id, 'ProjectID': project_id, 'SensorID': sensor_id,
                   'Sample_Time': sample_time,
                   'Z_Fields': sample}
            data.append(row)
        self.split_for_batch(data)
        # self.import_sample_data_dynamo(data)

    def get_files_received_by_ftp_prod2_data(self):
        sql_cmd = f"SELECT TOP (1000) * FROM FilesReceivedByFTP_Prod2"
        self.cursor.execute(sql_cmd)
        records = self.cursor.fetchall()
        samples = []
        records = list(records)
        for res in records:
            samples_data = {self.cursor.description[i][0]: res[i] for i in range(len(res))}
            self.fix_bad_fields(samples_data)
            samples.append(samples_data)
        return samples

    def split_for_batch(self, data):
        temp_list = []
        list_to_batch = []

        while len(data) > 0:
            temp_list.append(data[0])
            data.pop(0)
            if len(temp_list) > 24 or len(data) is 0:
                list_to_batch.append(temp_list)
                temp_list = []
                print(len(data))
                print(len(list_to_batch))
        start_time = time.time()
        num_workers = multiprocessing.cpu_count()
        p = Pool(num_workers - 1)
        p.map(self.import_sample_data_dynamo, list_to_batch)
        p.close()
        p.join()
        elapsed = time.time() - start_time
        print(f"read_all_samples elapsed {elapsed:.0F} Seconds")

    def import_sample_data_dynamo(self, data):
        with self.table.batch_writer() as batch:
            for item in data:
                ddb_data = json.loads(json.dumps(item, default=json_util.default),
                                      parse_float=Decimal, object_hook=json_util.object_hook)
                batch.put_item(Item=ddb_data)
        return True

    def fix_bad_fields(self, data):
        for k, v in data.items():
            if v == '':
                data[k] = '---'
            # elif type(v) == type(datetime.datetime.now()):
            #     # data[k] = v.strftime("%d/%m/%Y, %H:%M:%S")
            #     data[k] = v.timestamp()
            elif type(v) is bytearray:
                data[k] = "bytearray"


if __name__ == '__main__':
    freeze_support()
    worker = GetSensorsSamplesSetToDynamoDBTable()
    worker.do_work()
  ask by Denis Golovach translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...