// ***************************************************************************** // * This file is part of the FreeFileSync project. It is distributed under * // * GNU General Public License: https://www.gnu.org/licenses/gpl-3.0 * // * Copyright (C) Zenju (zenju AT freefilesync DOT org) - All Rights Reserved * // ***************************************************************************** #ifndef STATUS_HANDLER_IMPL_H_07682758976 #define STATUS_HANDLER_IMPL_H_07682758976 #include #include #include #include "process_callback.h" #include "speed_test.h" namespace fff { class AsyncCallback { public: AsyncCallback() {} //non-blocking: context of worker thread (and main thread, see reportStats()) void updateDataProcessed(int itemsDelta, int64_t bytesDelta) //noexcept! { itemsDeltaProcessed_ += itemsDelta; bytesDeltaProcessed_ += bytesDelta; } void updateDataTotal(int itemsDelta, int64_t bytesDelta) //noexcept! { itemsDeltaTotal_ += itemsDelta; bytesDeltaTotal_ += bytesDelta; } //context of worker thread void updateStatus(std::wstring&& msg) //throw ThreadStopRequest { assert(!zen::runningOnMainThread()); { std::lock_guard dummy(lockCurrentStatus_); if (ThreadStatus* ts = getThreadStatus()) //call while holding "lockCurrentStatus_" lock!! ts->statusMsg = std::move(msg); else assert(false); } zen::interruptionPoint(); //throw ThreadStopRequest } //blocking call: context of worker thread //=> indirect support for "pause": logInfo() is called under singleThread lock, // so all other worker threads will wait when coming out of parallel I/O (trying to lock singleThread) void logMessage(const std::wstring& msg, PhaseCallback::MsgType type) //throw ThreadStopRequest { assert(!zen::runningOnMainThread()); { std::unique_lock dummy(lockRequest_); zen::interruptibleWait(conditionReadyForNewRequest_, dummy, [this] { return !logMsgRequest_; }); //throw ThreadStopRequest logMsgRequest_ = LogMsgRequest{msg, type}; } conditionNewRequest.notify_all(); } //blocking call: context of worker thread PhaseCallback::Response reportError(const PhaseCallback::ErrorInfo& errorInfo) //throw ThreadStopRequest { assert(!zen::runningOnMainThread()); std::unique_lock dummy(lockRequest_); zen::interruptibleWait(conditionReadyForNewRequest_, dummy, [this] { return !errorRequest_ && !errorResponse_; }); //throw ThreadStopRequest errorRequest_ = errorInfo; conditionNewRequest.notify_all(); zen::interruptibleWait(conditionHaveResponse_, dummy, [this] { return static_cast(errorResponse_); }); //throw ThreadStopRequest PhaseCallback::Response rv = *errorResponse_; errorRequest_ = std::nullopt; errorResponse_ = std::nullopt; dummy.unlock(); //optimization for condition_variable::notify_all() conditionReadyForNewRequest_.notify_all(); //=> spurious wake-up for AsyncCallback::logInfo() return rv; } //blocking call: context of worker thread void reportWarning(const std::wstring& msg, bool& warningActive) //throw ThreadStopRequest { assert(!zen::runningOnMainThread()); { std::unique_lock dummy(lockRequest_); zen::interruptibleWait(conditionReadyForNewRequest_, dummy, [this] { return !warningRequest_ && !warningResponse_; }); //throw ThreadStopRequest warningRequest_ = WarningRequest{msg, warningActive}; conditionNewRequest.notify_all(); zen::interruptibleWait(conditionHaveResponse_, dummy, [this] { return static_cast(warningResponse_); }); //throw ThreadStopRequest warningActive = warningResponse_->warningActive; warningRequest_ = std::nullopt; warningResponse_ = std::nullopt; } conditionReadyForNewRequest_.notify_all(); //=> spurious wake-up for AsyncCallback::logInfo() } //context of main thread std::pair waitUntilDone(std::chrono::milliseconds cbInterval, PhaseCallback& cb) //throw X { assert(zen::runningOnMainThread()); for (;;) { const std::chrono::steady_clock::time_point callbackTime = std::chrono::steady_clock::now() + cbInterval; for (std::unique_lock dummy(lockRequest_);;) //process all errors without delay { const bool rv = conditionNewRequest.wait_until(dummy, callbackTime, [this] { return logMsgRequest_ || (errorRequest_ && !errorResponse_) || (warningRequest_ && !warningResponse_) || finishNowRequest_; }); if (!rv) //time-out + condition not met break; if (logMsgRequest_) { cb.logMessage(logMsgRequest_->msg, logMsgRequest_->type); //throw X logMsgRequest_ = {}; conditionReadyForNewRequest_.notify_all(); //=> spurious wake-up for AsyncCallback::reportError() } if (errorRequest_ && !errorResponse_) { assert(!finishNowRequest_); errorResponse_ = cb.reportError(*errorRequest_); //throw X conditionHaveResponse_.notify_all(); //instead of notify_one(); work around bug: https://svn.boost.org/trac/boost/ticket/7796 } if (warningRequest_ && !warningResponse_) { assert(!finishNowRequest_); bool warningActive = warningRequest_->warningActive; cb.reportWarning(warningRequest_->msg, warningActive); //throw X warningResponse_ = WarningResponse{warningActive}; conditionHaveResponse_.notify_all(); } if (finishNowRequest_) { dummy.unlock(); //call member functions outside of mutex scope: reportStats(cb); //one last call for accurate stat-reporting! return std::make_pair(itemsProcessed_, bytesProcessed_); } } //call back outside of mutex scope: cb.updateStatus(getStatusMsg()); //throw X reportStats(cb); } } void notifyTaskBegin(size_t prio) //noexcept { assert(!zen::runningOnMainThread()); const std::thread::id threadId = std::this_thread::get_id(); std::lock_guard dummy(lockCurrentStatus_); assert(!getThreadStatus()); if (statusByPriority_.size() < prio + 1) statusByPriority_.resize(prio + 1); statusByPriority_[prio].push_back({threadId, std::wstring()}); } void notifyTaskEnd() //noexcept { assert(!zen::runningOnMainThread()); const std::thread::id threadId = std::this_thread::get_id(); std::lock_guard dummy(lockCurrentStatus_); for (std::vector& sbp : statusByPriority_) for (ThreadStatus& ts : sbp) if (ts.threadId == threadId) { std::swap(ts, sbp.back()); sbp.pop_back(); return; } assert(false); } void notifyAllDone() //noexcept { { std::lock_guard dummy(lockRequest_); assert(!finishNowRequest_); finishNowRequest_ = true; } conditionNewRequest.notify_all(); } private: AsyncCallback (const AsyncCallback&) = delete; AsyncCallback& operator=(const AsyncCallback&) = delete; struct ThreadStatus { std::thread::id threadId; std::wstring statusMsg; }; ThreadStatus* getThreadStatus() //call while holding "lockCurrentStatus_" lock!! { assert(!zen::runningOnMainThread()); const std::thread::id threadId = std::this_thread::get_id(); for (std::vector& sbp : statusByPriority_) for (ThreadStatus& ts : sbp) //thread count is (hopefully) small enough so that linear search won't hurt perf if (ts.threadId == threadId) return &ts; return nullptr; } //context of main thread void reportStats(PhaseCallback& cb) { assert(zen::runningOnMainThread()); const int itemsDeltaProcessed = itemsDeltaProcessed_; //get value snapshot from atomics const int64_t bytesDeltaProcessed = bytesDeltaProcessed_; // if (itemsDeltaProcessed != 0 || bytesDeltaProcessed != 0) { updateDataProcessed (-itemsDeltaProcessed, -bytesDeltaProcessed); //careful with these atomics: don't just set to 0 cb.updateDataProcessed( itemsDeltaProcessed, bytesDeltaProcessed); //noexcept! itemsProcessed_ += itemsDeltaProcessed; bytesProcessed_ += bytesDeltaProcessed; } const int itemsDeltaTotal = itemsDeltaTotal_; const int64_t bytesDeltaTotal = bytesDeltaTotal_; if (itemsDeltaTotal != 0 || bytesDeltaTotal != 0) { updateDataTotal (-itemsDeltaTotal, -bytesDeltaTotal); cb.updateDataTotal( itemsDeltaTotal, bytesDeltaTotal); //noexcept! } } //context of main thread, call repreatedly std::wstring getStatusMsg() { assert(zen::runningOnMainThread()); size_t parallelOpsTotal = 0; std::wstring statusMsg; { std::lock_guard dummy(lockCurrentStatus_); for (const auto& sbp : statusByPriority_) parallelOpsTotal += sbp.size(); statusMsg = [&] { for (const std::vector& sbp : statusByPriority_) for (const ThreadStatus& ts : sbp) if (!ts.statusMsg.empty()) return ts.statusMsg; return std::wstring(); }(); } if (parallelOpsTotal >= 2) return L'[' + _P("1 thread", "%x threads", parallelOpsTotal) + L"] " + statusMsg; else return statusMsg; } struct LogMsgRequest { std::wstring msg; PhaseCallback::MsgType type = PhaseCallback::MsgType::error; }; struct WarningRequest { std::wstring msg; bool warningActive = false; }; struct WarningResponse { bool warningActive = false; }; //---- main <-> worker communication channel ---- std::mutex lockRequest_; std::condition_variable conditionReadyForNewRequest_; std::condition_variable conditionNewRequest; std::condition_variable conditionHaveResponse_; std::optional logMsgRequest_; std::optional errorRequest_; std::optional errorResponse_; std::optional warningRequest_; std::optional warningResponse_; bool finishNowRequest_ = false; //---- status updates ---- std::mutex lockCurrentStatus_; //different lock for status updates so that we're not blocked by other threads reporting errors std::vector> statusByPriority_; //give status messages priority according to their folder pair (e.g. first folder pair has prio 0) => visualize (somewhat) natural processing order //---- status updates II (lock-free) ---- std::atomic itemsDeltaProcessed_{0}; // std::atomic bytesDeltaProcessed_{0}; //std:atomic is uninitialized by default! std::atomic itemsDeltaTotal_ {0}; // std::atomic bytesDeltaTotal_ {0}; // //---- aggregated numbers; accessed by main thread only ---- int itemsProcessed_ = 0; int64_t bytesProcessed_ = 0; }; //manage statistics reporting for a single item of work template class ItemStatReporter { public: ItemStatReporter(int itemsExpected, int64_t bytesExpected, Callback& cb) : itemsExpected_(itemsExpected), bytesExpected_(bytesExpected), cb_(cb) {} ~ItemStatReporter() { const bool scopeFail = std::uncaught_exceptions() > exeptionCount_; if (scopeFail) cb_.updateDataTotal(itemsReported_, bytesReported_); //=> unexpected increase of total workload else //update statistics to consider the real amount of data, e.g. CopyFileEx: more than the "file size" for ADS streams, //less for sparse and compressed files, or file changed in the meantime! cb_.updateDataTotal(itemsReported_ - itemsExpected_, bytesReported_ - bytesExpected_); //noexcept! } void updateStatus(std::wstring&& msg) { cb_.updateStatus(std::move(msg)); } //throw X void logMessage(const std::wstring& msg, PhaseCallback::MsgType type) { cb_.logMessage(msg, type); } //throw X void reportWarning(const std::wstring& msg, bool& warningActive) { cb_.reportWarning(msg, warningActive); }//throw X void reportDelta(int itemsDelta, int64_t bytesDelta) //noexcept! { cb_.updateDataProcessed(itemsDelta, bytesDelta); //noexcept! itemsReported_ += itemsDelta; bytesReported_ += bytesDelta; //special rule: avoid temporary statistics mess up, even though they are corrected anyway below: if (itemsReported_ > itemsExpected_) { cb_.updateDataTotal(itemsReported_ - itemsExpected_, 0); //noexcept! itemsReported_ = itemsExpected_; } if (bytesReported_ > bytesExpected_) { cb_.updateDataTotal(0, bytesReported_ - bytesExpected_); //=> everything above "bytesExpected" adds to both "processed" and "total" data bytesReported_ = bytesExpected_; } } private: int itemsReported_ = 0; int64_t bytesReported_ = 0; const int itemsExpected_; const int64_t bytesExpected_; Callback& cb_; const int exeptionCount_ = std::uncaught_exceptions(); }; using AsyncItemStatReporter = ItemStatReporter; //===================================================================================================================== constexpr std::chrono::seconds STATUS_PERCENT_DELAY(2); constexpr std::chrono::seconds STATUS_PERCENT_MIN_DURATION(3); const int STATUS_PERCENT_MIN_CHANGES_PER_SEC = 2; constexpr std::chrono::seconds STATUS_PERCENT_SPEED_WINDOW(10); template struct PercentStatReporter { PercentStatReporter(const std::wstring& statusMsg, int64_t bytesExpected, ItemStatReporter& statReporter) : msgPrefix_(statusMsg + L"... "), bytesExpected_(bytesExpected), statReporter_(statReporter) {} //[!] no "updateStatus() /*throw X*/" in constructor! let caller decide void updateDeltaAndStatus(int64_t bytesDelta) //throw X { statReporter_.reportDelta(0 /*itemsDelta*/, bytesDelta); bytesCopied_ += bytesDelta; const auto now = std::chrono::steady_clock::now(); if (now >= lastUpdate_ + UI_UPDATE_INTERVAL / 2) //every ~25 ms { lastUpdate_ = now; if (!showPercent_ && bytesCopied_ > 0) { if (startTime_ == std::chrono::steady_clock::time_point()) { startTime_ = now; //get higher-quality perf stats when starting timing here rather than constructor!? speedTest_.addSample(std::chrono::seconds(0), 0 /*itemsCurrent*/, bytesCopied_); } else if (const std::chrono::nanoseconds elapsed = now - startTime_; elapsed >= STATUS_PERCENT_DELAY) { speedTest_.addSample(elapsed, 0 /*itemsCurrent*/, bytesCopied_); if (const std::optional remSecs = speedTest_.getRemainingSec(0 /*itemsRemaining*/, bytesExpected_ - bytesCopied_)) if (*remSecs > std::chrono::duration(STATUS_PERCENT_MIN_DURATION).count()) { showPercent_ = true; speedTest_.clear(); //discard (probably messy) numbers } } } if (showPercent_) { speedTest_.addSample(now - startTime_, 0 /*itemsCurrent*/, bytesCopied_); const std::optional bps = speedTest_.getBytesPerSec(); statReporter_.updateStatus(msgPrefix_ + formatPercent(std::min(static_cast(bytesCopied_) / bytesExpected_, 1.0), //> 100% possible! see process_callback.h notes bps ? *bps : 0, bytesExpected_)); //throw X } } } private: static std::wstring formatPercent(double fraction, double bytesPerSec, int64_t bytesTotal) { const double totalSecs = numeric::isNull(bytesPerSec) ? 0 : bytesTotal / bytesPerSec; const double expectedSteps = totalSecs * STATUS_PERCENT_MIN_CHANGES_PER_SEC; const int decPlaces = [&] //TODO? protect against format flickering!? { if (expectedSteps <= 100) return 0; if (expectedSteps <= 1000) return 1; if (expectedSteps <= 10000) return 2; if (expectedSteps <= 100000) return 3; //return static_cast(std::ceil(std::log10(expectedSteps))) - 2; -> overkill! /**/ return 4; }(); return zen::formatProgressPercent(fraction, decPlaces); } bool showPercent_ = false; const std::wstring msgPrefix_; const int64_t bytesExpected_; int64_t bytesCopied_ = 0; std::chrono::steady_clock::time_point startTime_; std::chrono::steady_clock::time_point lastUpdate_; SpeedTest speedTest_{STATUS_PERCENT_SPEED_WINDOW}; ItemStatReporter& statReporter_; }; //===================================================================================================================== template inline void reportInfo(std::wstring&& msg, Callback& cb /*throw X*/) //throw X { cb.logMessage(msg, PhaseCallback::MsgType::info); //throw X cb.updateStatus(std::move(msg)); // } template inline //return ignored error message if available std::wstring tryReportingError(Function cmd /*throw FileError*/, Callback& cb /*throw X*/) //throw X { for (size_t retryNumber = 0;; ++retryNumber) try { cmd(); //throw FileError return std::wstring(); } catch (const zen::FileError& e) { assert(!e.toString().empty()); switch (cb.reportError({e.toString(), std::chrono::steady_clock::now(), retryNumber})) //throw X { case PhaseCallback::ignore: return e.toString(); case PhaseCallback::retry: break; //continue with loop } } } //===================================================================================================================== struct ParallelContext { const AbstractPath& itemPath; AsyncCallback& acb; }; using ParallelWorkItem = std::function /*throw ThreadStopRequest*/; namespace { void massParallelExecute(const std::vector>& workload, const Zstring& threadGroupName, PhaseCallback& callback /*throw X*/) //throw X { using namespace zen; std::map*>> perDeviceWorkload; for (const auto& item : workload) perDeviceWorkload[item.first.afsDevice].push_back(&item); if (perDeviceWorkload.empty()) return; //[!] otherwise AsyncCallback::notifyAllDone() is never called! AsyncCallback acb; //manage life time: enclose ThreadGroup's!!! std::atomic activeDeviceCount(perDeviceWorkload.size()); // //--------------------------------------------------------------------------------------------------------- std::vector>> deviceThreadGroups; //worker threads live here... //--------------------------------------------------------------------------------------------------------- for (const auto& [afsDevice, wl] : perDeviceWorkload) { const size_t statusPrio = deviceThreadGroups.size(); const Zstring& deviceGroupName = threadGroupName + Zstr(' ') + utfTo(AFS::getDisplayPath(AbstractPath(afsDevice, AfsPath()))); deviceThreadGroups.emplace_back(1, deviceGroupName); auto& threadGroup = deviceThreadGroups.back(); for (const std::pair* item : wl) threadGroup.run([&acb, statusPrio, &itemPath = item->first, &task = item->second] { acb.notifyTaskBegin(statusPrio); ZEN_ON_SCOPE_EXIT(acb.notifyTaskEnd()); ParallelContext pctx{itemPath, acb}; task(pctx); //throw ThreadStopRequest }); threadGroup.notifyWhenDone([&acb, &activeDeviceCount] /*noexcept! runs on worker thread!*/ { if (--activeDeviceCount == 0) acb.notifyAllDone(); //noexcept }); } acb.waitUntilDone(UI_UPDATE_INTERVAL / 2 /*every ~25 ms*/, callback); //throw X } } //===================================================================================================================== template inline auto parallelScope(Function&& fun, std::mutex& singleThread) //throw X { singleThread.unlock(); ZEN_ON_SCOPE_EXIT(singleThread.lock()); return fun(); //throw X } } #endif //STATUS_HANDLER_IMPL_H_07682758976