diff --git a/configs/local/nezha-replica-config-1.yaml b/configs/local/nezha-replica-config-1.yaml index 7a64a61..152d0af 100755 --- a/configs/local/nezha-replica-config-1.yaml +++ b/configs/local/nezha-replica-config-1.yaml @@ -33,6 +33,6 @@ crash-vector-request-period-ms: 50 # make it longer, becasue it will clear all p recovery-request-period-ms: 10 sync-report-period-ms: 10 key-num: 1000000 # When commutativity is enabled for test, we need the specified key number to detect commutativity -moving_percentile: 0.50 # the percentile used to estimate owd +moving-percentile: 0.50 # the percentile used to estimate owd owd-estimation-window: 1000 # We use a sliding window to get moving median of one-way delays, here is the length of the window reclaim-timeout-ms: 5000 # To save memory, those requests who enter the late buffer will not stay there forever: if they have stayed for so long, they will be reclaimed. Similary, for unsynced log entries, if (1) they have been kept for more than reclaim-timeout-ms (2) they are not used by the worker threads; then they will be reclaimed diff --git a/configs/local/nezha-replica-config-2.yaml b/configs/local/nezha-replica-config-2.yaml index 73e89d4..e4fcdcb 100755 --- a/configs/local/nezha-replica-config-2.yaml +++ b/configs/local/nezha-replica-config-2.yaml @@ -33,6 +33,6 @@ crash-vector-request-period-ms: 50 # make it longer, becasue it will clear all p recovery-request-period-ms: 10 sync-report-period-ms: 10 key-num: 1000000 # When commutativity is enabled for test, we need the specified key number to detect commutativity -moving_percentile: 0.50 # the percentile used to estimate owd +moving-percentile: 0.50 # the percentile used to estimate owd owd-estimation-window: 1000 # We use a sliding window to get moving median of one-way delays, here is the length of the window reclaim-timeout-ms: 5000 # To save memory, those requests who enter the late buffer will not stay there forever: if they have stayed for so long, they will be reclaimed. Similary, for unsynced log entries, if (1) they have been kept for more than reclaim-timeout-ms (2) they are not used by the worker threads; then they will be reclaimed diff --git a/lib/common_struct.h b/lib/common_struct.h index 3d16726..4eaad0d 100644 --- a/lib/common_struct.h +++ b/lib/common_struct.h @@ -137,7 +137,7 @@ struct LogEntry { LogEntry* next; // The next LogEntry pointer std::string result; // The execution result of the LogEntry - char status; // + char status; // TODO(Katie): does this need to be thread safe? uint32_t logId; // The logId (the position of the LogEntry in the list) of // the entry diff --git a/replica/replica.cc b/replica/replica.cc index f9ac7bc..da1c9a7 100755 --- a/replica/replica.cc +++ b/replica/replica.cc @@ -668,20 +668,6 @@ void Replica::RecordTd(int id) { while (status_ != ReplicaStatus::TERMINATED) { BlockWhenStatusIsNot(ReplicaStatus::NORMAL); if (recordQu_[id].try_dequeue(rb)) { - // cnt++; - // if (cnt == 1) { - // sta = GetMicrosecondTimestamp(); - // } - // if (cnt % 100000 == 0) { - // ed = GetMicrosecondTimestamp(); - // float rate = 100000.0 / ((ed - sta) * 1e-6); - // sta = ed; - // LOG(INFO) << "id=" << id << " record rate = " << rate << "\t" - // << "recordQuLen=" << recordQu_[id].size_approx() << "\t" - // << "processQuLen=" << processQu_.size_approx() << "\t" - // << "gap sample =" << ed - rb->deadline - // << " \t deadline=" << rb->deadline; - // } /** The map is sharded by reqKey */ LogEntry* duplicate = recordMap_[id].get(rb->reqKey); if (duplicate == NULL) { @@ -690,10 +676,13 @@ void Replica::RecordTd(int id) { recordMap_[id].assign(rb->reqKey, newEntry); processQu_.enqueue(newEntry); - } else { - // Duplicate requests - processQu_.enqueue(duplicate); + } else if (duplicate->status == EntryStatus::PROCESSED) { + uint32_t quId = (duplicate->body.reqKey) % fastReplyQu_.size(); + fastReplyQu_[quId].enqueue( + duplicate); // TODO(Katie): what happens if proxy id changes? } + // else: duplicate is not yet processed. Reply is sent when duplicate is + // processed. delete rb; } } @@ -776,15 +765,6 @@ void Replica::ProcessTd(int id) { entry->status = EntryStatus::IN_LATEBUFFER; } } - } else if (entry->status == EntryStatus::IN_PROCESS || - entry->status == EntryStatus::IN_LATEBUFFER) { - continue; - } else if (entry->status == EntryStatus::PROCESSED) { - uint32_t quId = (entry->body.reqKey) % fastReplyQu_.size(); - fastReplyQu_[quId].enqueue(entry); - } else if (entry->status == EntryStatus::TO_SLOW_REPLY) { - uint32_t quId = (entry->body.reqKey) % slowReplyQu_.size(); - slowReplyQu_[quId].enqueue(entry); } else { LOG(WARNING) << "Unexpected Entry Status " << (int)(entry->status); }