本文整理汇总了C++中mutation_ptr类的典型用法代码示例。如果您正苦于以下问题:C++ mutation_ptr类的具体用法?C++ mutation_ptr怎么用?C++ mutation_ptr使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了mutation_ptr类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的C++代码示例。
示例1: send_prepare_message
void replica::send_prepare_message(const dsn_address_t& addr, partition_status status, mutation_ptr& mu, int timeout_milliseconds)
{
dsn_message_t msg = dsn_msg_create_request(RPC_PREPARE, timeout_milliseconds, gpid_to_hash(get_gpid()));
replica_configuration rconfig;
_primary_states.get_replica_config(status, rconfig);
{
msg_binary_writer writer(msg);
marshall(writer, get_gpid());
marshall(writer, rconfig);
mu->write_to(writer);
}
mu->remote_tasks()[addr] = rpc::call(addr, msg,
this,
std::bind(&replica::on_prepare_reply,
this,
std::make_pair(mu, rconfig.status),
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3),
gpid_to_hash(get_gpid())
);
ddebug(
"%s: mutation %s send_prepare_message to %s:%hu as %s",
name(), mu->name(),
addr.name, addr.port,
enum_to_string(rconfig.status)
);
}
开发者ID:asd1355215911,项目名称:rDSN,代码行数:31,代码来源:replica_2pc.cpp
示例2: send_prepare_message
void replica::send_prepare_message(const end_point& addr, partition_status status, mutation_ptr& mu, int timeout_milliseconds)
{
message_ptr msg = message::create_request(RPC_PREPARE, timeout_milliseconds, gpid_to_hash(get_gpid()));
marshall(msg, get_gpid());
replica_configuration rconfig;
_primary_states.get_replica_config(status, rconfig);
marshall(msg, rconfig);
mu->write_to(msg);
dbg_dassert (mu->remote_tasks().find(addr) == mu->remote_tasks().end());
mu->remote_tasks()[addr] = rpc::call(addr, msg,
this,
std::bind(&replica::on_prepare_reply,
this,
std::make_pair(mu, rconfig.status),
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3),
gpid_to_hash(get_gpid())
);
ddebug(
"%s: mutation %s send_prepare_message to %s:%d as %s",
name(), mu->name(),
addr.name.c_str(), static_cast<int>(addr.port),
enum_to_string(rconfig.status)
);
}
开发者ID:Abioy,项目名称:rDSN,代码行数:31,代码来源:replica_2pc.cpp
示例3: send_prepare_message
void replica::send_prepare_message(
::dsn::rpc_address addr,
partition_status status,
mutation_ptr& mu,
int timeout_milliseconds,
int64_t learn_signature)
{
dsn_message_t msg = dsn_msg_create_request(RPC_PREPARE, timeout_milliseconds, gpid_to_hash(get_gpid()));
replica_configuration rconfig;
_primary_states.get_replica_config(status, rconfig, learn_signature);
{
rpc_write_stream writer(msg);
marshall(writer, get_gpid());
marshall(writer, rconfig);
mu->write_to(writer);
}
mu->remote_tasks()[addr] = rpc::call(addr, msg,
this,
[=](error_code err, dsn_message_t request, dsn_message_t reply)
{
on_prepare_reply(std::make_pair(mu, rconfig.status), err, request, reply);
},
gpid_to_hash(get_gpid())
);
ddebug(
"%s: mutation %s send_prepare_message to %s as %s",
name(), mu->name(),
addr.to_string(),
enum_to_string(rconfig.status)
);
}
开发者ID:Jupige,项目名称:rDSN,代码行数:34,代码来源:replica_2pc.cpp
示例4: execute_mutation
void replica::execute_mutation(mutation_ptr& mu)
{
dassert (nullptr != _app, "");
int err = ERR_SUCCESS;
switch (status())
{
case PS_INACTIVE:
if (_app->last_committed_decree() + 1 == mu->data.header.decree)
err = _app->write_internal(mu, false);
break;
case PS_PRIMARY:
case PS_SECONDARY:
{
dassert (_app->last_committed_decree() + 1 == mu->data.header.decree, "");
bool ack_client = (status() == PS_PRIMARY);
if (ack_client)
{
if (mu->client_request == nullptr)
ack_client = false;
else if (mu->client_request->header().from_address.ip == 0)
ack_client = false;
}
err = _app->write_internal(mu, ack_client);
}
break;
case PS_POTENTIAL_SECONDARY:
if (LearningSucceeded == _potential_secondary_states.learning_status)
{
if (mu->data.header.decree == _app->last_committed_decree() + 1)
{
err = _app->write_internal(mu, false);
}
else
{
dassert (mu->data.header.decree <= _app->last_committed_decree(), "");
}
}
else
{
// drop mutations as learning will catch up
ddebug("%s: mutation %s skipped coz learing buffer overflow", name(), mu->name());
}
break;
case PS_ERROR:
break;
}
ddebug("TwoPhaseCommit, %s: mutation %s committed, err = %x", name(), mu->name(), err);
if (err != ERR_SUCCESS)
{
handle_local_failure(err);
}
}
开发者ID:SunnyGyb,项目名称:rDSN,代码行数:55,代码来源:replica.cpp
示例5: on_append_log_completed
void replica::on_append_log_completed(mutation_ptr& mu, error_code err, size_t size)
{
check_hashed_access();
ddebug( "%s: mutation %s on_append_log_completed, err = %s", name(), mu->name(), err.to_string());
if (err == ERR_OK)
{
mu->set_logged();
}
// skip old mutations
if (mu->data.header.ballot < get_ballot() || status() == PS_INACTIVE)
{
return;
}
switch (status())
{
case PS_PRIMARY:
if (err == ERR_OK)
{
do_possible_commit_on_primary(mu);
}
else
{
handle_local_failure(err);
}
break;
case PS_SECONDARY:
case PS_POTENTIAL_SECONDARY:
if (err != ERR_OK)
{
handle_local_failure(err);
}
if (!_options.prepare_ack_on_secondary_before_logging_allowed)
{
ack_prepare_message(err, mu);
}
break;
case PS_ERROR:
break;
default:
dassert (false, "");
break;
}
}
开发者ID:asd1355215911,项目名称:rDSN,代码行数:48,代码来源:replica_2pc.cpp
示例6: copy_from
void mutation::copy_from(mutation_ptr& old)
{
data.updates = old->data.updates;
client_requests = old->client_requests;
_appro_data_bytes = old->_appro_data_bytes;
_create_ts_ns = old->_create_ts_ns;
for (auto& r : client_requests)
{
if (r != nullptr)
{
dsn_msg_add_ref(r); // release in dctor
}
}
// let's always re-append the mutation to
// replication logs as the ballot number
// is changed, to ensure the invariance:
// if decree(A) >= decree(B)
// then ballot(A) >= ballot(B)
/*if (old->is_logged())
{
set_logged();
data.header.log_offset = old->data.header.log_offset;
}
*/
_prepare_request = old->prepare_msg();
if (_prepare_request)
{
dsn_msg_add_ref(_prepare_request);
}
}
开发者ID:ykwd,项目名称:rDSN,代码行数:33,代码来源:mutation.cpp
示例7: ack_prepare_message
void replica::ack_prepare_message(error_code err, mutation_ptr& mu)
{
prepare_ack resp;
resp.gpid = get_gpid();
resp.err = err;
resp.ballot = get_ballot();
resp.decree = mu->data.header.decree;
// for PS_POTENTIAL_SECONDARY ONLY
resp.last_committed_decree_in_app = _app->last_committed_decree();
resp.last_committed_decree_in_prepare_list = last_committed_decree();
dassert(nullptr != mu->prepare_msg(), "");
reply(mu->prepare_msg(), resp);
ddebug("%s: mutation %s ack_prepare_message, err = %s", name(), mu->name(), err.to_string());
}
开发者ID:Jupige,项目名称:rDSN,代码行数:17,代码来源:replica_2pc.cpp
示例8: move_from
void mutation::move_from(mutation_ptr& old)
{
data.updates = std::move(old->data.updates);
rpc_code = old->rpc_code;
_client_request = old->client_msg();
if (_client_request)
{
old->_client_request = nullptr;
}
_prepare_request = old->prepare_msg();
if (_prepare_request)
{
old->_prepare_request = nullptr;
}
}
开发者ID:Bran-Stark,项目名称:rDSN,代码行数:17,代码来源:mutation.cpp
示例9: on_append_log_completed
void replica::on_append_log_completed(mutation_ptr& mu, uint32_t err, uint32_t size)
{
check_hashed_access();
ddebug( "%s: mutation %s on_append_log_completed, err = %u", name(), mu->name(), err);
if (err == ERR_SUCCESS)
{
mu->set_logged();
}
// skip old mutations
if (mu->data.header.ballot < get_ballot() || status() == PS_INACTIVE)
{
return;
}
switch (status())
{
case PS_PRIMARY:
if (err == ERR_SUCCESS)
{
do_possible_commit_on_primary(mu);
}
else
{
handle_local_failure(err);
}
break;
case PS_SECONDARY:
case PS_POTENTIAL_SECONDARY:
if (err != ERR_SUCCESS)
{
handle_local_failure(err);
}
ack_prepare_message(err, mu);
break;
case PS_ERROR:
break;
default:
dassert (false, "");
break;
}
}
开发者ID:Abioy,项目名称:rDSN,代码行数:44,代码来源:replica_2pc.cpp
示例10: do_possible_commit_on_primary
void replica::do_possible_commit_on_primary(mutation_ptr& mu)
{
dassert (_config.ballot == mu->data.header.ballot, "");
dassert (PS_PRIMARY == status(), "");
if (mu->is_ready_for_commit())
{
_prepare_list->commit(mu->data.header.decree, COMMIT_ALL_READY);
}
}
开发者ID:Jupige,项目名称:rDSN,代码行数:10,代码来源:replica_2pc.cpp
示例11: do_possible_commit_on_primary
void replica::do_possible_commit_on_primary(mutation_ptr& mu)
{
dassert (_config.ballot == mu->data.header.ballot, "");
dassert (PS_PRIMARY == status(), "");
if (mu->is_ready_for_commit(_options.prepare_ack_on_secondary_before_logging_allowed))
{
_prepare_list->commit(mu->data.header.decree, false);
}
}
开发者ID:asd1355215911,项目名称:rDSN,代码行数:10,代码来源:replica_2pc.cpp
示例12: copy_from
void mutation::copy_from(mutation_ptr& old)
{
data.updates = old->data.updates;
rpc_code = old->rpc_code;
if (old->is_logged())
{
set_logged();
data.header.log_offset = old->data.header.log_offset;
}
_client_request = old->client_msg();
if (_client_request)
{
dsn_msg_add_ref(_client_request);
}
_prepare_request = old->prepare_msg();
if (_prepare_request)
{
dsn_msg_add_ref(_prepare_request);
}
}
开发者ID:strategist922,项目名称:rDSN,代码行数:22,代码来源:mutation.cpp
示例13: write_internal
error_code replication_app_base::write_internal(mutation_ptr& mu)
{
dassert (mu->data.header.decree == last_committed_decree() + 1, "");
if (mu->rpc_code != RPC_REPLICATION_WRITE_EMPTY)
{
binary_reader reader(mu->data.updates[0]);
dsn_message_t resp = (mu->client_msg() ? dsn_msg_create_response(mu->client_msg()) : nullptr);
dispatch_rpc_call(mu->rpc_code, reader, resp);
}
else
{
on_empty_write();
}
if (_physical_error != 0)
{
derror("physical error %d occurs in replication local app %s", _physical_error, data_dir().c_str());
}
return _physical_error == 0 ? ERR_OK : ERR_LOCAL_APP_FAILURE;
}
开发者ID:jango2015,项目名称:rDSN,代码行数:22,代码来源:replication_app_base.cpp
示例14: write_internal
error_code replication_app_base::write_internal(mutation_ptr& mu)
{
dassert (mu->data.header.decree == last_committed_decree() + 1, "");
dassert(mu->client_requests.size() == mu->data.updates.size()
&& mu->client_requests.size() > 0,
"data inconsistency in mutation");
int count = static_cast<int>(mu->client_requests.size());
_batch_state = (count == 1 ? BS_NOT_BATCH : BS_BATCH);
for (int i = 0; i < count; i++)
{
if (_batch_state == BS_BATCH && i + 1 == count)
{
_batch_state = BS_BATCH_LAST;
}
auto& r = mu->client_requests[i];
if (r.code != RPC_REPLICATION_WRITE_EMPTY)
{
dinfo("%s: mutation %s dispatch rpc call: %s",
_replica->name(), mu->name(), dsn_task_code_to_string(r.code));
binary_reader reader(mu->data.updates[i]);
dsn_message_t resp = (r.req ? dsn_msg_create_response(r.req) : nullptr);
uint64_t now = dsn_now_ns();
dispatch_rpc_call(r.code, reader, resp);
now = dsn_now_ns() - now;
_app_commit_latency.set(now);
}
else
{
// empty mutation write
}
if (_physical_error != 0)
{
derror("%s: physical error %d occurs in replication local app %s",
_replica->name(), _physical_error, data_dir().c_str());
return ERR_LOCAL_APP_FAILURE;
}
}
++_last_committed_decree;
_replica->update_commit_statistics(count);
_app_commit_throughput.add((uint64_t)count);
_app_commit_decree.increment();
return ERR_OK;
}
开发者ID:Strongc,项目名称:rDSN,代码行数:51,代码来源:replication_app_base.cpp
示例15: init_prepare
void replica::init_prepare(mutation_ptr& mu)
{
dassert (PS_PRIMARY == status(), "");
error_code err = ERR_OK;
uint8_t count = 0;
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 < _options.mutation_2pc_min_replica_count)
{
err = ERR_NOT_ENOUGH_MEMBER;
goto ErrOut;
}
mu->data.header.last_committed_decree = last_committed_decree();
if (mu->data.header.decree == invalid_decree)
{
mu->set_id(get_ballot(), _prepare_list->max_decree() + 1);
}
else
{
mu->set_id(get_ballot(), mu->data.header.decree);
}
ddebug("%s: mutation %s init_prepare", name(), mu->name());
// check bounded staleness
if (mu->data.header.decree > last_committed_decree() + _options.staleness_for_commit)
{
err = ERR_CAPACITY_EXCEEDED;
goto ErrOut;
}
dassert (mu->data.header.decree > last_committed_decree(), "");
// local prepare
err = _prepare_list->prepare(mu, PS_PRIMARY);
if (err != ERR_OK)
{
goto ErrOut;
}
// remote prepare
mu->set_prepare_ts();
mu->set_left_secondary_ack_count((unsigned int)_primary_states.membership.secondaries.size());
for (auto it = _primary_states.membership.secondaries.begin(); it != _primary_states.membership.secondaries.end(); it++)
{
send_prepare_message(*it, PS_SECONDARY, mu, _options.prepare_timeout_ms_for_secondaries);
}
count = 0;
for (auto it = _primary_states.learners.begin(); it != _primary_states.learners.end(); it++)
{
if (it->second.prepare_start_decree != invalid_decree && mu->data.header.decree >= it->second.prepare_start_decree)
{
send_prepare_message(it->first, PS_POTENTIAL_SECONDARY, mu, _options.prepare_timeout_ms_for_potential_secondaries);
count++;
}
}
mu->set_left_potential_secondary_ack_count(count);
// it is possible to do commit here when logging is not required for acking prepare.
// however, it is only possible when replica count == 1 at this moment in the
// replication group, and we don't want to do this as it is too fragile now.
// do_possible_commit_on_primary(mu);
// local log
dassert (mu->data.header.log_offset == invalid_offset, "");
dassert (mu->log_task() == nullptr, "");
mu->log_task() = _stub->_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
this,
std::bind(&replica::on_append_log_completed, this, mu,
std::placeholders::_1,
std::placeholders::_2),
gpid_to_hash(get_gpid())
);
dassert(nullptr != mu->log_task(), "");
return;
ErrOut:
response_client_message(mu->client_msg(), err);
return;
}
开发者ID:asd1355215911,项目名称:rDSN,代码行数:84,代码来源:replica_2pc.cpp
示例16: init_prepare
void replica::init_prepare(mutation_ptr& mu)
{
dassert (PS_PRIMARY == status(), "");
error_code err = ERR_SUCCESS;
uint8_t count = 0;
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 < _options.mutation_2pc_min_replica_count)
{
err = ERR_NOT_ENOUGH_MEMBER;
goto ErrOut;
}
mu->data.header.last_committed_decree = last_committed_decree();
if (mu->data.header.decree == invalid_decree)
{
mu->set_id(get_ballot(), _prepare_list->max_decree() + 1);
}
else
{
mu->set_id(get_ballot(), mu->data.header.decree);
}
if (mu->data.header.decree > _prepare_list->max_decree() && _prepare_list->count() >= _options.staleness_for_commit)
{
err = ERR_CAPACITY_EXCEEDED;
goto ErrOut;
}
dassert (mu->data.header.decree > last_committed_decree(), "");
// local prepare without log
err = _prepare_list->prepare(mu, PS_PRIMARY);
if (err != ERR_SUCCESS)
{
goto ErrOut;
}
ddebug("%s: mutation %s init_prepare", name(), mu->name());
//
// TODO: bounded staleness on secondaries
//
dassert (mu->data.header.decree <= last_committed_decree() + _options.staleness_for_commit, "");
// remote prepare
dassert (mu->remote_tasks().size() == 0, "");
mu->set_left_secondary_ack_count((unsigned int)_primary_states.membership.secondaries.size());
for (auto it = _primary_states.membership.secondaries.begin(); it != _primary_states.membership.secondaries.end(); it++)
{
send_prepare_message(*it, PS_SECONDARY, mu, _options.prepare_timeout_ms_for_secondaries);
}
count = 0;
for (auto it = _primary_states.learners.begin(); it != _primary_states.learners.end(); it++)
{
if (it->second.prepare_start_decree != invalid_decree && mu->data.header.decree >= it->second.prepare_start_decree)
{
send_prepare_message(it->first, PS_POTENTIAL_SECONDARY, mu, _options.prepare_timeout_ms_for_potential_secondaries);
count++;
}
}
mu->set_left_potential_secondary_ack_count(count);
// local log
dassert (mu->data.header.log_offset == invalid_offset, "");
dassert (mu->log_task() == nullptr, "");
mu->log_task() = _stub->_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
this,
std::bind(&replica::on_append_log_completed, this, mu,
std::placeholders::_1,
std::placeholders::_2),
gpid_to_hash(get_gpid())
);
if (nullptr == mu->log_task())
{
err = ERR_FILE_OPERATION_FAILED;
handle_local_failure(err);
goto ErrOut;
}
return;
ErrOut:
response_client_message(mu->client_request, err);
return;
}
开发者ID:Abioy,项目名称:rDSN,代码行数:89,代码来源:replica_2pc.cpp
示例17: init_prepare
void replica::init_prepare(mutation_ptr& mu)
{
dassert (PS_PRIMARY == status(), "");
error_code err = ERR_OK;
uint8_t count = 0;
mu->data.header.last_committed_decree = last_committed_decree();
if (mu->data.header.decree == invalid_decree)
{
mu->set_id(get_ballot(), _prepare_list->max_decree() + 1);
}
else
{
mu->set_id(get_ballot(), mu->data.header.decree);
}
dinfo("%s: mutation %s init_prepare, mutation_tid=%" PRIu64, name(), mu->name(), mu->tid());
// check bounded staleness
if (mu->data.header.decree > last_committed_decree() + _options->staleness_for_commit)
{
err = ERR_CAPACITY_EXCEEDED;
goto ErrOut;
}
dassert (mu->data.header.decree > last_committed_decree(), "");
// local prepare
err = _prepare_list->prepare(mu, PS_PRIMARY);
if (err != ERR_OK)
{
goto ErrOut;
}
// remote prepare
mu->set_prepare_ts();
mu->set_left_secondary_ack_count((unsigned int)_primary_states.membership.secondaries.size());
for (auto it = _primary_states.membership.secondaries.begin(); it != _primary_states.membership.secondaries.end(); ++it)
{
send_prepare_message(*it, PS_SECONDARY, mu, _options->prepare_timeout_ms_for_secondaries);
}
count = 0;
for (auto it = _primary_states.learners.begin(); it != _primary_states.learners.end(); ++it)
{
if (it->second.prepare_start_decree != invalid_decree && mu->data.header.decree >= it->second.prepare_start_decree)
{
send_prepare_message(it->first, PS_POTENTIAL_SECONDARY, mu, _options->prepare_timeout_ms_for_potential_secondaries, it->second.signature);
count++;
}
}
mu->set_left_potential_secondary_ack_count(count);
if (mu->is_logged())
{
do_possible_commit_on_primary(mu);
}
else
{
dassert(mu->data.header.log_offset == invalid_offset, "");
dassert(mu->log_task() == nullptr, "");
mu->log_task() = _stub->_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
this,
std::bind(&replica::on_append_log_completed, this, mu,
std::placeholders::_1,
std::placeholders::_2),
gpid_to_hash(get_gpid())
);
dassert(nullptr != mu->log_task(), "");
}
return;
ErrOut:
for (auto& r : mu->client_requests)
{
response_client_message(r, err);
}
return;
}
开发者ID:Jupige,项目名称:rDSN,代码行数:83,代码来源:replica_2pc.cpp
示例18: on_append_log_completed
void replica::on_append_log_completed(mutation_ptr& mu, error_code err, size_t size)
{
check_hashed_access();
ddebug( "%s: mutation %s on_append_log_completed, err = %s", name(), mu->name(), err.to_string());
if (err == ERR_OK)
{
mu->set_logged();
}
// skip old mutations
if (mu->data.header.ballot < get_ballot() || status() == PS_INACTIVE)
{
return;
}
switch (status())
{
case PS_PRIMARY:
if (err == ERR_OK)
{
do_possible_commit_on_primary(mu);
}
else
{
handle_local_failure(err);
}
break;
case PS_SECONDARY:
case PS_POTENTIAL_SECONDARY:
if (err != ERR_OK)
{
handle_local_failure(err);
}
ack_prepare_message(err, mu);
break;
case PS_ERROR:
break;
default:
dassert (false, "");
break;
}
// mutation log failure, propagted to all replicas
if (err != ERR_OK)
{
_stub->handle_log_failure(err);
}
// write local private log if necessary
else if (_private_log && status() != PS_ERROR)
{
_private_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
this,
[this](error_code err, size_t size)
{
if (err != ERR_OK)
{
handle_local_failure(err);
}
},
gpid_to_hash(get_gpid())
);
}
}
开发者ID:fxfslm,项目名称:rDSN,代码行数:68,代码来源:replica_2pc.cpp
示例19: on_append_log_completed
void replica::on_append_log_completed(mutation_ptr& mu, error_code err, size_t size)
{
check_hashed_access();
dinfo("%s: append shared log completed for mutation %s, size = %u, err = %s",
name(), mu->name(), size, err.to_string());
if (err == ERR_OK)
{
mu->set_logged();
}
else
{
derror("%s: append shared log failed for mutation %s, err = %s",
name(), mu->name(), err.to_string());
}
// skip old mutations
if (mu->data.header.ballot >= get_ballot() && status() != PS_INACTIVE)
{
switch (status())
{
case PS_PRIMARY:
if (err == ERR_OK)
{
do_possible_commit_on_primary(mu);
}
else
{
handle_local_failure(err);
}
break;
case PS_SECONDARY:
case PS_POTENTIAL_SECONDARY:
if (err != ERR_OK)
{
handle_local_failure(err);
}
// always ack
ack_prepare_message(err, mu);
break;
case PS_ERROR:
break;
default:
dassert(false, "");
break;
}
}
if (err != ERR_OK)
{
// mutation log failure, propagate to all replicas
_stub->handle_log_failure(err);
}
// write local private log if necessary
if (err == ERR_OK && _private_log && status() != PS_ERROR)
{
_private_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
nullptr,
[this, mu](error_code err, size_t size)
{
//
// DO NOT CHANGE THIS CALLBACK HERE UNLESS
// YOU FULLY UNDERSTAND WHAT WE DO HERE
//
// AS PRIVATE LOG IS BATCHED, WE ONLY EXECUTE
// THE FIRST CALLBACK IF THERE IS FAILURE TO
// NOTIFY FAILURE. ALL OTHER TASKS ARE SIMPLY
// CANCELLED!!!
//
// TODO: we do not need so many callbacks
//
dinfo("%s: append private log completed for mutation %s, size = %u, err = %s",
name(), mu->name(), size, err.to_string());
if (err != ERR_OK)
{
derror("%s: append private log failed for mutation %s, err = %s",
name(), mu->name(), err.to_string());
handle_local_failure(err);
}
},
gpid_to_hash(get_gpid())
);
}
}
开发者ID:Jupige,项目名称:rDSN,代码行数:89,代码来源:replica_2pc.cpp
示例20: on_append_log_completed
void replica::on_append_log_completed(mutation_ptr& mu, error_code err, size_t size)
{
check_hashed_access();
dinfo("%s: append shared log completed for mutation %s, size = %u, err = %s",
name(), mu->name(), size, err.to_string());
if (err == ERR_OK)
{
mu->set_logged();
}
else
{
derror("%s: append shared log failed for mutation %s, err = %s",
name(), mu->name(), err.to_string());
}
// skip old mutations
if (mu->data.header.ballot >= get_ballot() && status() != partition_status::PS_INACTIVE)
{
switch (status())
{
case partition_status::PS_PRIMARY:
if (err == ERR_OK)
{
do_possible_commit_on_primary(mu);
}
else
{
handle_local_failure(err);
}
break;
case partition_status::PS_SECONDARY:
case partition_status::PS_POTENTIAL_SECONDARY:
if (err != ERR_OK)
{
handle_local_failure(err);
}
// always ack
ack_prepare_message(err, mu);
break;
case partition_status::PS_ERROR:
break;
default:
dassert(false, "");
break;
}
}
if (err != ERR_OK)
{
// mutation log failure, propagate to all replicas
_stub->handle_log_failure(err);
}
// write local private log if necessary
if (err == ERR_OK && _private_log && status() != partition_status::PS_ERROR)
{
_private_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
nullptr,
nullptr,
gpid_to_hash(get_gpid())
);
}
}
开发者ID:mcfatealan,项目名称:rDSN,代码行数:66,代码来源:replica_2pc.cpp
注:本文中的mutation_ptr类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论