• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pymysqlreplication.BinLogStreamReader类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pymysqlreplication.BinLogStreamReader的典型用法代码示例。如果您正苦于以下问题:Python BinLogStreamReader类的具体用法?Python BinLogStreamReader怎么用?Python BinLogStreamReader使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BinLogStreamReader类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

def main():
    # connect rethinkdb
    rethinkdb.connect("localhost", 28015, "mysql")
    try:
        rethinkdb.db_drop("mysql").run()
    except:
        pass
    rethinkdb.db_create("mysql").run()

    tables = ["dept_emp", "dept_manager", "titles",
              "salaries", "employees", "departments"]
    for table in tables:
        rethinkdb.db("mysql").table_create(table).run()

    stream = BinLogStreamReader(
        connection_settings=MYSQL_SETTINGS,
        blocking=True,
        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
    )

    # process Feed
    for binlogevent in stream:
        if not isinstance(binlogevent, WriteRowsEvent):
            continue

        for row in binlogevent.rows:
            if not binlogevent.schema == "employees":
                continue

            vals = {}
            vals = {str(k): str(v) for k, v in row["values"].iteritems()}
            rethinkdb.table(binlogevent.table).insert(vals).run()

    stream.close()
开发者ID:Affirm,项目名称:python-mysql-replication,代码行数:34,代码来源:rethinkdb_sync.py


示例2: Listener

class Listener(object):
    def __init__(self, connection_settings, server_id, blocking=True, resume_stream=True):
        self._stream = BinLogStreamReader(
            connection_settings=connection_settings,
            server_id=server_id,
            only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
            blocking=blocking,
            resume_stream=resume_stream
        )

    def __del__(self):
        self._stream.close()

    def accept(self, callback):
        for log in self._stream:
            for row in log.rows:
                fields = {}
                method = ''
                if isinstance(log, DeleteRowsEvent):
                    fields = row["values"]
                    method = 'DELETE'
                elif isinstance(log, UpdateRowsEvent):
                    fields = row["after_values"]
                    method = 'UPDATE'
                elif isinstance(log, WriteRowsEvent):
                    method = 'INSERT'
                    fields = row["values"]

                logger.debug(
                    "捕获mysql %r事件, 值为: %r",
                    method, json.dumps(fields)
                )
                callback(log.schema, log.table, method, fields)
开发者ID:JianfuLi,项目名称:hamal,代码行数:33,代码来源:listener.py


示例3: proc_binlog

	def proc_binlog(self):
		stream = BinLogStreamReader(
			connection_settings = self.config['mysql'],
			server_id = self.config['slave']['server_id'],
			log_file = self.log_file,
			log_pos = self.log_pos,
			only_schemas = self.config['slave']['schemas'],
			blocking = True,
			resume_stream = bool(self.log_file and self.log_pos),
			only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent]
		)
		for binlogevent in stream:
			#binlogevent.dump()
			self.log_file = stream.log_file
			self.log_pos  = stream.log_pos
			for row in binlogevent.rows:
				pk = binlogevent.primary_key
				table = binlogevent.table
				schema = binlogevent.schema
				if isinstance(binlogevent, WriteRowsEvent):
					yield self.es.index_op(self._format(row['values']), doc_type=table, index=schema, id=row['values'][pk])
				elif isinstance(binlogevent, UpdateRowsEvent):
					yield self.es.update_op(self._format(row['after_values']), doc_type=table, index=schema, id=row['after_values'][pk])
				elif isinstance(binlogevent, DeleteRowsEvent):
					yield self.es.delete_op(doc_type=table, index=schema, id=row['values'][pk])
				else:
					continue

		stream.close()
开发者ID:xhook7,项目名称:py-mysql-es,代码行数:29,代码来源:sync.py


示例4: main

def main():
    stream = BinLogStreamReader(
        connection_settings=MYSQL_SETTINGS,
        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

    for binlogevent in stream:
        for row in binlogevent.rows:
            event = {}
            event["schema"] = binlogevent.schema
            event["table"] = binlogevent.table

            if isinstance(binlogevent, DeleteRowsEvent):
                event["action"] = "delete"
                event = dict(event.items() + row["values"].items())
            elif isinstance(binlogevent, UpdateRowsEvent):
                event["action"] = "update"
                event = dict(event.items() + row["after_values"].items())
            elif isinstance(binlogevent, WriteRowsEvent):
                event["action"] = "insert"
                event = dict(event.items() + row["values"].items())
            print json.dumps(event)
            sys.stdout.flush()


    stream.close()
开发者ID:Affirm,项目名称:python-mysql-replication,代码行数:25,代码来源:mysql_to_logstash.py


示例5: main

def main():
    rclient = redis.from_url(redis_url)
    cache = rcache.Rcache(cache_url, server_id)
 
    log_file = rclient.get("log_file")
    log_pos = rclient.get("log_pos")
    log_pos = int(log_pos) if log_pos else None
 
    only_events = _trans_events(events)
    only_events.append(RotateEvent)
 
    stream = BinLogStreamReader(
        connection_settings=mysql_settings,
        server_id=server_id,
        blocking=blocking,
        only_events=only_events,                                                                                                                                                       
        only_tables=tables,
        only_schemas=schemas,
        resume_stream=True,  # for resuming
        freeze_schema=False, # do not support alter table event for faster
        log_file=log_file,
        log_pos=log_pos)
    row_count = 0
 
    for binlogevent in stream:
        if int(time.time()) - binlogevent.timestamp > binlog_max_latency:
            logger.warn("latency[{}] too large".format(
                int(time.time()) - binlogevent.timestamp))
        logger.debug("catch {}".format(binlogevent.__class__.__name__))
        if isinstance(binlogevent, RotateEvent):  #listen log_file changed event
            rclient.set("log_file", binlogevent.next_binlog)
            rclient.set("log_pos", binlogevent.position)
            logger.info("log_file:{}, log_position:{}".format(
                binlogevent.next_binlog, binlogevent.position))
        else:
            row_count += 1
            table = "%s.%s" % (binlogevent.schema, binlogevent.table)
            vals_lst = _get_row_values(binlogevent)
            if not binlogevent.primary_key:
                tables_without_primary_key.get(table, None)
            try:
                cache.save(table, binlogevent.primary_key, vals_lst)
                logger.debug("save {} {} rows to cache".format(
                    table, len(vals_lst)))
            except rcache.SaveIgnore as err:
                logger.warning(str(err))
            except rcache.FullError as err:
                logger.info("cache OOM occured: {}.trigger dump command".format(
                    str(err)))
                dump_code = _trigger_dumping()
                cache.save(table, binlogevent.primary_key, vals_lst)
            if cache_max_rows and cache.size > cache_max_rows:
                logger.info("cache size:{} >= {}, trigger dumping".format(
                   cache.size, cache_max_rows))
                _trigger_dumping()
            rclient.set("log_pos", binlogevent.packet.log_pos)
        if row_count % 1000 == 0:
            logger.info("save {} changed rows".format(row_count))
 
    stream.close()
开发者ID:dlf412,项目名称:mysql-cdc-redis,代码行数:60,代码来源:cdc.py


示例6: main

def main():
    utils.drop_privileges()
    if BinLogStreamReader is None:
        utils.err("error: Python module `pymysqlreplication' is missing")
        return 1
    settings = zabbix_bridge_conf.get_settings()

    # Set blocking to True if you want to block and wait for the next event at
    # the end of the stream
    stream = BinLogStreamReader(connection_settings=settings['mysql'],
                                server_id=settings['slaveid'],
                                only_events=[WriteRowsEvent],
                                resume_stream=True,
                                blocking=True)

    db_filename = settings['sqlitedb']
    dbcache = sqlite3.connect(':memory:')
    cachecur = dbcache.cursor()
    cachecur.execute("ATTACH DATABASE '%s' as 'dbfile'" % (db_filename,))
    cachecur.execute('CREATE TABLE zabbix_cache AS SELECT * FROM dbfile.zabbix_cache')
    cachecur.execute('CREATE UNIQUE INDEX uniq_zid on zabbix_cache (id)')

    # tcollector.zabbix_bridge namespace for internal Zabbix bridge metrics.
    log_pos = 0
    key_lookup_miss = 0
    sample_last_ts = int(time.time())
    last_key_lookup_miss = 0

    for binlogevent in stream:
        if binlogevent.schema == settings['mysql']['db']:
            table = binlogevent.table
            log_pos = binlogevent.packet.log_pos
            if table == 'history' or table == 'history_uint':
                for row in binlogevent.rows:
                    r = row['values']
                    itemid = r['itemid']
                    cachecur.execute('SELECT id, key, host, proxy FROM zabbix_cache WHERE id=?', (itemid,))
                    row = cachecur.fetchone()
                    if (row is not None):
                        print("zbx.%s %d %s host=%s proxy=%s" % (row[1], r['clock'], r['value'], row[2], row[3]))
                        if ((int(time.time()) - sample_last_ts) > settings['internal_metric_interval']): # Sample internal metrics @ 10s intervals
                            sample_last_ts = int(time.time())
                            print("tcollector.zabbix_bridge.log_pos %d %s" % (sample_last_ts, log_pos))
                            print("tcollector.zabbix_bridge.key_lookup_miss %d %s" % (sample_last_ts, key_lookup_miss))
                            print("tcollector.zabbix_bridge.timestamp_drift %d %s" % (sample_last_ts, (sample_last_ts - r['clock'])))
                            if ((key_lookup_miss - last_key_lookup_miss) > settings['dbrefresh']):
                                print("tcollector.zabbix_bridge.key_lookup_miss_reload %d %s" % (sample_last_ts, (key_lookup_miss - last_key_lookup_miss)))
                                cachecur.execute('DROP TABLE zabbix_cache')
                                cachecur.execute('CREATE TABLE zabbix_cache AS SELECT * FROM dbfile.zabbix_cache')
                                cachecur.execute('CREATE UNIQUE INDEX uniq_zid on zabbix_cache (id)')
                                last_key_lookup_miss = key_lookup_miss
                    else:
                        # TODO: Consider https://wiki.python.org/moin/PythonDecoratorLibrary#Retry
                        utils.err("error: Key lookup miss for %s" % (itemid))
                        key_lookup_miss += 1
                sys.stdout.flush()

    dbcache.close()
    stream.close()
开发者ID:OpenTSDB,项目名称:tcollector,代码行数:59,代码来源:zabbix_bridge.py


示例7: main

def main():
  global repLogFile
  global repLogPosition
  global repLogConfig

  graphiteConfig = readGraphiteConfig()

  try:
    print "Start"
    sock = socket.socket()
    sock.connect((CARBON_SERVER, CARBON_PORT))
    print 'Carbon socket opened.'
    stream = BinLogStreamReader(
        connection_settings=MYSQL_SETTINGS,
        server_id=2, #server id needs to be unique
        only_events=[WriteRowsEvent,DeleteRowsEvent,UpdateRowsEvent],
        blocking=True,
        log_file=repLogFile,
        log_pos=repLogPosition,
        resume_stream=False if repLogPosition==None else True)
    print "Binlog stream opened"

    for binlogevent in stream:
      #put replication log file and position in variables so we can save them later
      repLogFile = stream.log_file
      repLogPosition = stream.log_pos
      #also check for changes in graphite configuration and read again if needed
      if binlogevent.schema == "weather" and binlogevent.table == "graphite":
        graphiteConfig = readGraphiteConfig()
      #this is the data we are interested in
      if binlogevent.schema == "weather" and binlogevent.table == "data":

        for row in binlogevent.rows:
          #we only care about inserts
          if isinstance(binlogevent, WriteRowsEvent):
            vals = row["values"]
            #check if the sensor is one that we have configuration for
            if vals["sensorid"] in graphiteConfig:
              conf = graphiteConfig[vals["sensorid"]]
              value = float(vals["value"])
              #do a conversion if needed
              if conf["formula"]!=None and conf["formula"]!="":
                value=eval(conf["formula"], {"__builtins__": {}}, {"value":value,"round":round})
              #construc the message and send it to carbon
              message = '%s %f %d\n' % (conf["graphitepath"], value, round((vals["time"] - _EPOCH).total_seconds()))
              sock.sendall(message)
              print str(vals["sensorid"]), str(vals["time"]), str(value)
              print message

  except KeyboardInterrupt:
    #close open connections
    stream.close()
    sock.close()
    #save replication log position
    repLogConfig.set('replicationlog','file',repLogFile)
    repLogConfig.set('replicationlog','position',str(repLogPosition))
    with open('replogposition.ini', 'w') as f:
      repLogConfig.write(f)
开发者ID:mika-koivusaari,项目名称:mysql_to_graphite,代码行数:58,代码来源:mysql_to_graphite.py


示例8: mysql_stream

def mysql_stream(conf, mongo, queue_out):
    logger = logging.getLogger(__name__)

    # server_id is your slave identifier, it should be unique.
    # set blocking to True if you want to block and wait for the next event at
    # the end of the stream
    mysql_settings = {
        "host": conf['host'],
        "port": conf.getint('port'),
        "user": conf['user'],
        "passwd": conf['password']
    }

    last_log = mongo.get_log_pos()
    if last_log['log_file'] == 'NA':
        log_file = None
        log_pos = None
        resume_stream = False
    else:
        log_file = last_log['log_file']
        log_pos = int(last_log['log_pos'])
        resume_stream = True

    stream = BinLogStreamReader(connection_settings=mysql_settings,
                                server_id=conf.getint('slaveid'),
                                only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
                                blocking=True,
                                resume_stream=resume_stream,
                                log_file=log_file,
                                log_pos=log_pos,
                                only_schemas=conf['databases'].split(','))

    for binlogevent in stream:
        binlogevent.dump()
        schema = "%s" % binlogevent.schema
        table = "%s" % binlogevent.table

        for row in binlogevent.rows:
            if isinstance(binlogevent, DeleteRowsEvent):
                vals = row["values"]
                event_type = 'delete'
            elif isinstance(binlogevent, UpdateRowsEvent):
                vals = dict()
                vals["before"] = row["before_values"]
                vals["after"] = row["after_values"]
                event_type = 'update'
            elif isinstance(binlogevent, WriteRowsEvent):
                vals = row["values"]
                event_type = 'insert'

            seqnum = mongo.write_to_queue(event_type, vals, schema, table)
            mongo.write_log_pos(stream.log_file, stream.log_pos)
            queue_out.put({'seqnum': seqnum})
            logger.debug(row)
            logger.debug(stream.log_pos)
            logger.debug(stream.log_file)

    stream.close()
开发者ID:njordr,项目名称:mymongo,代码行数:58,代码来源:mysql.py


示例9: TestCTLConnectionSettings

class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase):

    def setUp(self):
        super(TestCTLConnectionSettings, self).setUp()
        self.stream.close()
        ctl_db = copy.copy(self.database)
        ctl_db["db"] = None
        ctl_db["port"] = 3307
        self.ctl_conn_control = pymysql.connect(**ctl_db)
        self.ctl_conn_control.cursor().execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
        self.ctl_conn_control.cursor().execute("CREATE DATABASE pymysqlreplication_test")
        self.ctl_conn_control.close()
        ctl_db["db"] = "pymysqlreplication_test"
        self.ctl_conn_control = pymysql.connect(**ctl_db)
        self.stream = BinLogStreamReader(
            self.database,
            ctl_connection_settings=ctl_db,
            server_id=1024,
            only_events=(WriteRowsEvent,),
            fail_on_table_metadata_unavailable=True
        )

    def tearDown(self):
        super(TestCTLConnectionSettings, self).tearDown()
        self.ctl_conn_control.close()

    def test_seperate_ctl_settings_table_metadata_unavailable(self):
        self.execute("CREATE TABLE test (id INTEGER(11))")
        self.execute("INSERT INTO test VALUES (1)")
        self.execute("COMMIT")

        had_error = False
        try:
            event = self.stream.fetchone()
        except TableMetadataUnavailableError as e:
            had_error = True
            assert "test" in e.args[0]
        finally:
            self.resetBinLog()
            assert had_error

    def test_seperate_ctl_settings_no_error(self):
        self.execute("CREATE TABLE test (id INTEGER(11))")
        self.execute("INSERT INTO test VALUES (1)")
        self.execute("DROP TABLE test")
        self.execute("COMMIT")
        self.ctl_conn_control.cursor().execute("CREATE TABLE test (id INTEGER(11))")
        self.ctl_conn_control.cursor().execute("INSERT INTO test VALUES (1)")
        self.ctl_conn_control.cursor().execute("COMMIT")
        try:
            self.stream.fetchone()
        except Exception as e:
            self.fail("raised unexpected exception: {exception}".format(exception=e))
        finally:
            self.resetBinLog()
开发者ID:kdparker,项目名称:python-mysql-replication,代码行数:55,代码来源:test_basic.py


示例10: main

def main():
    # server_id is your slave identifier, it should be unique.
    # set blocking to True if you want to block and wait for the next event at
    # the end of the stream
    stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                                server_id=3,
                                blocking=True)

    for binlogevent in stream:
        binlogevent.dump()

    stream.close()
开发者ID:atopos0627,项目名称:Handle_jsonData_inMysql,代码行数:12,代码来源:Handle_jsonData_inMysql.py


示例11: consume_events

def consume_events():
    stream = BinLogStreamReader(connection_settings=database,
                                server_id=3,
                                resume_stream=False,
                                blocking=True,
                                only_events = [UpdateRowsEvent],
                                only_tables = ['test'] )
    start = time.clock()
    i = 0.0
    for binlogevent in stream:
            i += 1.0
            if i % 1000 == 0:
                print("%d event by seconds (%d total)" % (i / (time.clock() - start), i))
    stream.close()
开发者ID:0xcc,项目名称:python-mysql-replication,代码行数:14,代码来源:benchmark.py


示例12: main

def main():
    # server_id is your slave identifier, it should be unique.
    # set blocking to True if you want to block and wait for the next event at
    # the end of the stream
    stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                                server_id=3,
                                log_file="mysql-bin.000002",
                                blocking=True,
                                only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

    for binlogevent in stream:
        binlogevent.dump()

    stream.close()
开发者ID:3manuek,项目名称:python-mysql-replication,代码行数:14,代码来源:dump_events_only.py


示例13: test_log_pos_handles_disconnects

    def test_log_pos_handles_disconnects(self):
        self.stream = BinLogStreamReader(
            connection_settings=self.database,
            resume_stream=True
        )

        query = "CREATE TABLE test (id INT  PRIMARY KEY AUTO_INCREMENT, data VARCHAR (50) NOT NULL)"
        self.execute(query)
        query = "INSERT INTO test (data) VALUES('Hello')"
        self.execute(query)
        self.execute("COMMIT")

        self.assertIsInstance(self.stream.fetchone(), RotateEvent)

        self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
        self.assertGreater(self.stream.log_pos, 0)

        self.assertIsInstance(self.stream.fetchone(), QueryEvent)
        self.assertIsInstance(self.stream.fetchone(), QueryEvent)
        self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
        self.assertIsInstance(self.stream.fetchone(), WriteRowsEvent)
        self.assertIsInstance(self.stream.fetchone(), XidEvent)

        self.assertIsNone(self.stream.fetchone())
        self.assertIsInstance(self.stream.fetchone(), RotateEvent)

        self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
        self.assertGreater(self.stream.log_pos, 0)
开发者ID:joeyzhu0422,项目名称:python-mysql-replication,代码行数:28,代码来源:test_basic.py


示例14: test_alter_column

    def test_alter_column(self):
        self.stream.close()
        self.execute("CREATE TABLE test_alter_column (id INTEGER(11), data VARCHAR(50))")
        self.execute("INSERT INTO test_alter_column VALUES (1, 'A value')")
        self.execute("COMMIT")
        # this is a problem only when column is added in position other than at the end
        self.execute("ALTER TABLE test_alter_column ADD COLUMN another_data VARCHAR(50) AFTER id")
        self.execute("INSERT INTO test_alter_column VALUES (2, 'Another value', 'A value')")
        self.execute("COMMIT")

        self.stream = BinLogStreamReader(
            self.database,
            server_id=1024,
            only_events=(WriteRowsEvent,),
            )
        event = self.stream.fetchone()  # insert with two values
        # both of these asserts fail because of issue underlying proble described in issue #118
        # because it got table schema info after the alter table, it wrongly assumes the second
        # column of the first insert is 'another_data'
        # ER: {'id': 1, 'data': 'A value'}
        # AR: {'id': 1, 'another_data': 'A value'}
        self.assertIn("data", event.rows[0]["values"])
        self.assertNot("another_data", event.rows[0]["values"])
        self.assertEqual(event.rows[0]["values"]["data"], 'A value')
        self.stream.fetchone()  # insert with three values
开发者ID:ruiaylin,项目名称:python-mysql-replication,代码行数:25,代码来源:test_basic.py


示例15: test_log_pos

    def test_log_pos(self):
        query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
        self.execute(query)
        query = "INSERT INTO test (data) VALUES('Hello')"
        self.execute(query)
        self.execute("COMMIT")

        for i in range(6):
            self.stream.fetchone()
        # record position after insert
        log_file, log_pos = self.stream.log_file, self.stream.log_pos

        query = "UPDATE test SET data = 'World' WHERE id = 1"
        self.execute(query)
        self.execute("COMMIT")

        # resume stream from previous position
        if self.stream is not None:
            self.stream.close()
        self.stream = BinLogStreamReader(
            connection_settings=self.database,
            resume_stream=True,
            log_file=log_file,
            log_pos=log_pos
        )

        self.assertIsInstance(self.stream.fetchone(), RotateEvent)
        self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
        self.assertIsInstance(self.stream.fetchone(), XidEvent)
        # QueryEvent for the BEGIN
        self.assertIsInstance(self.stream.fetchone(), QueryEvent)
        self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
        self.assertIsInstance(self.stream.fetchone(), UpdateRowsEvent)
        self.assertIsInstance(self.stream.fetchone(), XidEvent)
开发者ID:darioush,项目名称:python-mysql-replication,代码行数:34,代码来源:test_basic.py


示例16: test_connection_stream_lost_event

    def test_connection_stream_lost_event(self):
        self.stream.close()
        self.stream = BinLogStreamReader(
            self.database, server_id=1024, blocking=True, ignored_events=self.ignoredEvents()
        )

        query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
        self.execute(query)
        query2 = "INSERT INTO test (data) VALUES('a')"
        for i in range(0, 10000):
            self.execute(query2)
        self.execute("COMMIT")

        self.assertIsInstance(self.stream.fetchone(), RotateEvent)
        self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)

        event = self.stream.fetchone()

        self.assertIsInstance(event, QueryEvent)
        self.assertEqual(event.query, query)

        self.conn_control.kill(self.stream._stream_connection.thread_id())
        for i in range(0, 1000):
            event = self.stream.fetchone()
            self.assertIsNotNone(event)
开发者ID:chaoslawful,项目名称:python-mysql-replication,代码行数:25,代码来源:test_basic.py


示例17: test_log_pos_handles_disconnects

    def test_log_pos_handles_disconnects(self):
        self.stream.close()
        self.stream = BinLogStreamReader(
            self.database,
            server_id=1024,
            resume_stream=False,
            only_events=[FormatDescriptionEvent, QueryEvent, TableMapEvent, WriteRowsEvent, XidEvent],
        )

        query = "CREATE TABLE test (id INT  PRIMARY KEY AUTO_INCREMENT, data VARCHAR (50) NOT NULL)"
        self.execute(query)
        query = "INSERT INTO test (data) VALUES('Hello')"
        self.execute(query)
        self.execute("COMMIT")

        self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
        self.assertGreater(self.stream.log_pos, 0)
        self.assertIsInstance(self.stream.fetchone(), QueryEvent)

        self.assertIsInstance(self.stream.fetchone(), QueryEvent)
        self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
        self.assertIsInstance(self.stream.fetchone(), WriteRowsEvent)

        self.assertIsInstance(self.stream.fetchone(), XidEvent)

        self.assertGreater(self.stream.log_pos, 0)
开发者ID:chaoslawful,项目名称:python-mysql-replication,代码行数:26,代码来源:test_basic.py


示例18: test_position_gtid

    def test_position_gtid(self):
        query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
        self.execute(query)
        query = "BEGIN;"
        self.execute(query)
        query = "INSERT INTO test (id, data) VALUES(1, 'Hello');"
        self.execute(query)
        query = "COMMIT;"
        self.execute(query)

        query = "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
        self.execute(query)
        query = "SELECT @@global.gtid_executed;"
        gtid = self.execute(query).fetchone()[0]

        self.stream.close()
        self.stream = BinLogStreamReader(self.database, server_id=1024, blocking=True, auto_position=gtid)

        self.assertIsInstance(self.stream.fetchone(), RotateEvent)
        self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
        self.assertIsInstance(self.stream.fetchone(), GtidEvent)
        event = self.stream.fetchone()

        self.assertEqual(
            event.query, "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
        )
开发者ID:chaoslawful,项目名称:python-mysql-replication,代码行数:26,代码来源:test_basic.py


示例19: __init__

    def __init__(self, mysql_settings, server_id, dump_file_path,
                 log_file=None, log_pos=None,
                 gtid_set=None, table_filters=None):
        # TODO: gtid mode support
        # https://dev.mysql.com/doc/refman/en/replication-gtids.html
        # TODO: wild chars in table_filters
        self.mysql_settings = mysql_settings
        self.server_id = server_id
        self.log_file = log_file
        self.log_pos = log_pos
        self.dump_file_path = dump_file_path

        if table_filters:
            self.table_filters = {schema:frozenset(tables) for schema, tables in table_filters.items()}
            only_schemas = [schema for schema in table_filters]
        else:
            self.table_filters = None
            only_schemas = None

        self.binlog_stream_reader = BinLogStreamReader(
            connection_settings=self.mysql_settings,
            server_id=self.server_id,
            log_file=self.log_file,
            log_pos=self.log_pos,
            resume_stream=True,
            blocking=False,
            freeze_schema=True,
            only_schemas=only_schemas,
        )
开发者ID:jffifa,项目名称:python-mysql-eventprocessor,代码行数:29,代码来源:stream.py


示例20: __init__

 def __init__(self, connection_settings, server_id, blocking=True, resume_stream=True):
     self._stream = BinLogStreamReader(
         connection_settings=connection_settings,
         server_id=server_id,
         only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
         blocking=blocking,
         resume_stream=resume_stream
     )
开发者ID:JianfuLi,项目名称:hamal,代码行数:8,代码来源:listener.py



注:本文中的pymysqlreplication.BinLogStreamReader类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pymystem3.Mystem类代码示例发布时间:2022-05-27
下一篇:
Python util.int2byte函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap