560 lines
22 KiB
C++
560 lines
22 KiB
C++
// *****************************************************************************
|
|
// * This file is part of the FreeFileSync project. It is distributed under *
|
|
// * GNU General Public License: https://www.gnu.org/licenses/gpl-3.0 *
|
|
// * Copyright (C) Zenju (zenju AT freefilesync DOT org) - All Rights Reserved *
|
|
// *****************************************************************************
|
|
|
|
#ifndef STATUS_HANDLER_IMPL_H_07682758976
|
|
#define STATUS_HANDLER_IMPL_H_07682758976
|
|
|
|
#include <zen/basic_math.h>
|
|
#include <zen/file_error.h>
|
|
#include <zen/thread.h>
|
|
#include "process_callback.h"
|
|
#include "speed_test.h"
|
|
|
|
|
|
namespace fff
|
|
{
|
|
class AsyncCallback
|
|
{
|
|
public:
|
|
AsyncCallback() {}
|
|
|
|
//non-blocking: context of worker thread (and main thread, see reportStats())
|
|
void updateDataProcessed(int itemsDelta, int64_t bytesDelta) //noexcept!
|
|
{
|
|
itemsDeltaProcessed_ += itemsDelta;
|
|
bytesDeltaProcessed_ += bytesDelta;
|
|
}
|
|
void updateDataTotal(int itemsDelta, int64_t bytesDelta) //noexcept!
|
|
{
|
|
itemsDeltaTotal_ += itemsDelta;
|
|
bytesDeltaTotal_ += bytesDelta;
|
|
}
|
|
|
|
//context of worker thread
|
|
void updateStatus(std::wstring&& msg) //throw ThreadStopRequest
|
|
{
|
|
assert(!zen::runningOnMainThread());
|
|
{
|
|
std::lock_guard dummy(lockCurrentStatus_);
|
|
if (ThreadStatus* ts = getThreadStatus()) //call while holding "lockCurrentStatus_" lock!!
|
|
ts->statusMsg = std::move(msg);
|
|
else assert(false);
|
|
}
|
|
zen::interruptionPoint(); //throw ThreadStopRequest
|
|
}
|
|
|
|
//blocking call: context of worker thread
|
|
//=> indirect support for "pause": logInfo() is called under singleThread lock,
|
|
// so all other worker threads will wait when coming out of parallel I/O (trying to lock singleThread)
|
|
void logMessage(const std::wstring& msg, PhaseCallback::MsgType type) //throw ThreadStopRequest
|
|
{
|
|
assert(!zen::runningOnMainThread());
|
|
{
|
|
std::unique_lock dummy(lockRequest_);
|
|
zen::interruptibleWait(conditionReadyForNewRequest_, dummy, [this] { return !logMsgRequest_; }); //throw ThreadStopRequest
|
|
|
|
logMsgRequest_ = LogMsgRequest{msg, type};
|
|
}
|
|
conditionNewRequest.notify_all();
|
|
}
|
|
|
|
//blocking call: context of worker thread
|
|
PhaseCallback::Response reportError(const PhaseCallback::ErrorInfo& errorInfo) //throw ThreadStopRequest
|
|
{
|
|
assert(!zen::runningOnMainThread());
|
|
std::unique_lock dummy(lockRequest_);
|
|
zen::interruptibleWait(conditionReadyForNewRequest_, dummy, [this] { return !errorRequest_ && !errorResponse_; }); //throw ThreadStopRequest
|
|
|
|
errorRequest_ = errorInfo;
|
|
conditionNewRequest.notify_all();
|
|
|
|
zen::interruptibleWait(conditionHaveResponse_, dummy, [this] { return static_cast<bool>(errorResponse_); }); //throw ThreadStopRequest
|
|
|
|
PhaseCallback::Response rv = *errorResponse_;
|
|
|
|
errorRequest_ = std::nullopt;
|
|
errorResponse_ = std::nullopt;
|
|
|
|
dummy.unlock(); //optimization for condition_variable::notify_all()
|
|
conditionReadyForNewRequest_.notify_all(); //=> spurious wake-up for AsyncCallback::logInfo()
|
|
return rv;
|
|
}
|
|
|
|
//blocking call: context of worker thread
|
|
void reportWarning(const std::wstring& msg, bool& warningActive) //throw ThreadStopRequest
|
|
{
|
|
assert(!zen::runningOnMainThread());
|
|
{
|
|
std::unique_lock dummy(lockRequest_);
|
|
zen::interruptibleWait(conditionReadyForNewRequest_, dummy, [this] { return !warningRequest_ && !warningResponse_; }); //throw ThreadStopRequest
|
|
|
|
warningRequest_ = WarningRequest{msg, warningActive};
|
|
conditionNewRequest.notify_all();
|
|
|
|
zen::interruptibleWait(conditionHaveResponse_, dummy, [this] { return static_cast<bool>(warningResponse_); }); //throw ThreadStopRequest
|
|
|
|
warningActive = warningResponse_->warningActive;
|
|
|
|
warningRequest_ = std::nullopt;
|
|
warningResponse_ = std::nullopt;
|
|
}
|
|
conditionReadyForNewRequest_.notify_all(); //=> spurious wake-up for AsyncCallback::logInfo()
|
|
}
|
|
|
|
//context of main thread
|
|
std::pair<int /*itemsProcessed*/, int64_t /*bytesProcessed*/> waitUntilDone(std::chrono::milliseconds cbInterval, PhaseCallback& cb) //throw X
|
|
{
|
|
assert(zen::runningOnMainThread());
|
|
for (;;)
|
|
{
|
|
const std::chrono::steady_clock::time_point callbackTime = std::chrono::steady_clock::now() + cbInterval;
|
|
|
|
for (std::unique_lock dummy(lockRequest_);;) //process all errors without delay
|
|
{
|
|
const bool rv = conditionNewRequest.wait_until(dummy, callbackTime, [this]
|
|
{
|
|
return logMsgRequest_ || (errorRequest_ && !errorResponse_) || (warningRequest_ && !warningResponse_) || finishNowRequest_;
|
|
});
|
|
if (!rv) //time-out + condition not met
|
|
break;
|
|
|
|
if (logMsgRequest_)
|
|
{
|
|
cb.logMessage(logMsgRequest_->msg, logMsgRequest_->type); //throw X
|
|
logMsgRequest_ = {};
|
|
conditionReadyForNewRequest_.notify_all(); //=> spurious wake-up for AsyncCallback::reportError()
|
|
}
|
|
if (errorRequest_ && !errorResponse_)
|
|
{
|
|
assert(!finishNowRequest_);
|
|
errorResponse_ = cb.reportError(*errorRequest_); //throw X
|
|
conditionHaveResponse_.notify_all(); //instead of notify_one(); work around bug: https://svn.boost.org/trac/boost/ticket/7796
|
|
}
|
|
if (warningRequest_ && !warningResponse_)
|
|
{
|
|
assert(!finishNowRequest_);
|
|
bool warningActive = warningRequest_->warningActive;
|
|
cb.reportWarning(warningRequest_->msg, warningActive); //throw X
|
|
warningResponse_ = WarningResponse{warningActive};
|
|
conditionHaveResponse_.notify_all();
|
|
}
|
|
if (finishNowRequest_)
|
|
{
|
|
dummy.unlock(); //call member functions outside of mutex scope:
|
|
reportStats(cb); //one last call for accurate stat-reporting!
|
|
return std::make_pair(itemsProcessed_, bytesProcessed_);
|
|
}
|
|
}
|
|
|
|
//call back outside of mutex scope:
|
|
cb.updateStatus(getStatusMsg()); //throw X
|
|
reportStats(cb);
|
|
}
|
|
}
|
|
|
|
void notifyTaskBegin(size_t prio) //noexcept
|
|
{
|
|
assert(!zen::runningOnMainThread());
|
|
const std::thread::id threadId = std::this_thread::get_id();
|
|
std::lock_guard dummy(lockCurrentStatus_);
|
|
assert(!getThreadStatus());
|
|
|
|
if (statusByPriority_.size() < prio + 1)
|
|
statusByPriority_.resize(prio + 1);
|
|
|
|
statusByPriority_[prio].push_back({threadId, std::wstring()});
|
|
}
|
|
|
|
void notifyTaskEnd() //noexcept
|
|
{
|
|
assert(!zen::runningOnMainThread());
|
|
const std::thread::id threadId = std::this_thread::get_id();
|
|
std::lock_guard dummy(lockCurrentStatus_);
|
|
|
|
for (std::vector<ThreadStatus>& sbp : statusByPriority_)
|
|
for (ThreadStatus& ts : sbp)
|
|
if (ts.threadId == threadId)
|
|
{
|
|
std::swap(ts, sbp.back());
|
|
sbp.pop_back();
|
|
return;
|
|
}
|
|
assert(false);
|
|
}
|
|
|
|
void notifyAllDone() //noexcept
|
|
{
|
|
{
|
|
std::lock_guard dummy(lockRequest_);
|
|
assert(!finishNowRequest_);
|
|
finishNowRequest_ = true;
|
|
}
|
|
conditionNewRequest.notify_all();
|
|
}
|
|
|
|
private:
|
|
AsyncCallback (const AsyncCallback&) = delete;
|
|
AsyncCallback& operator=(const AsyncCallback&) = delete;
|
|
|
|
struct ThreadStatus
|
|
{
|
|
std::thread::id threadId;
|
|
std::wstring statusMsg;
|
|
};
|
|
|
|
ThreadStatus* getThreadStatus() //call while holding "lockCurrentStatus_" lock!!
|
|
{
|
|
assert(!zen::runningOnMainThread());
|
|
const std::thread::id threadId = std::this_thread::get_id();
|
|
|
|
for (std::vector<ThreadStatus>& sbp : statusByPriority_)
|
|
for (ThreadStatus& ts : sbp) //thread count is (hopefully) small enough so that linear search won't hurt perf
|
|
if (ts.threadId == threadId)
|
|
return &ts;
|
|
return nullptr;
|
|
}
|
|
|
|
//context of main thread
|
|
void reportStats(PhaseCallback& cb)
|
|
{
|
|
assert(zen::runningOnMainThread());
|
|
const int itemsDeltaProcessed = itemsDeltaProcessed_; //get value snapshot from atomics
|
|
const int64_t bytesDeltaProcessed = bytesDeltaProcessed_; //
|
|
if (itemsDeltaProcessed != 0 || bytesDeltaProcessed != 0)
|
|
{
|
|
updateDataProcessed (-itemsDeltaProcessed, -bytesDeltaProcessed); //careful with these atomics: don't just set to 0
|
|
cb.updateDataProcessed( itemsDeltaProcessed, bytesDeltaProcessed); //noexcept!
|
|
|
|
itemsProcessed_ += itemsDeltaProcessed;
|
|
bytesProcessed_ += bytesDeltaProcessed;
|
|
}
|
|
|
|
const int itemsDeltaTotal = itemsDeltaTotal_;
|
|
const int64_t bytesDeltaTotal = bytesDeltaTotal_;
|
|
if (itemsDeltaTotal != 0 || bytesDeltaTotal != 0)
|
|
{
|
|
updateDataTotal (-itemsDeltaTotal, -bytesDeltaTotal);
|
|
cb.updateDataTotal( itemsDeltaTotal, bytesDeltaTotal); //noexcept!
|
|
}
|
|
}
|
|
|
|
//context of main thread, call repreatedly
|
|
std::wstring getStatusMsg()
|
|
{
|
|
assert(zen::runningOnMainThread());
|
|
|
|
size_t parallelOpsTotal = 0;
|
|
std::wstring statusMsg;
|
|
{
|
|
std::lock_guard dummy(lockCurrentStatus_);
|
|
|
|
for (const auto& sbp : statusByPriority_)
|
|
parallelOpsTotal += sbp.size();
|
|
|
|
statusMsg = [&]
|
|
{
|
|
for (const std::vector<ThreadStatus>& sbp : statusByPriority_)
|
|
for (const ThreadStatus& ts : sbp)
|
|
if (!ts.statusMsg.empty())
|
|
return ts.statusMsg;
|
|
return std::wstring();
|
|
}();
|
|
}
|
|
if (parallelOpsTotal >= 2)
|
|
return L'[' + _P("1 thread", "%x threads", parallelOpsTotal) + L"] " + statusMsg;
|
|
else
|
|
return statusMsg;
|
|
}
|
|
|
|
struct LogMsgRequest
|
|
{
|
|
std::wstring msg;
|
|
PhaseCallback::MsgType type = PhaseCallback::MsgType::error;
|
|
};
|
|
struct WarningRequest
|
|
{
|
|
std::wstring msg;
|
|
bool warningActive = false;
|
|
};
|
|
struct WarningResponse { bool warningActive = false; };
|
|
|
|
//---- main <-> worker communication channel ----
|
|
std::mutex lockRequest_;
|
|
std::condition_variable conditionReadyForNewRequest_;
|
|
std::condition_variable conditionNewRequest;
|
|
std::condition_variable conditionHaveResponse_;
|
|
std::optional<LogMsgRequest> logMsgRequest_;
|
|
std::optional<PhaseCallback::ErrorInfo> errorRequest_;
|
|
std::optional<PhaseCallback::Response > errorResponse_;
|
|
std::optional<WarningRequest> warningRequest_;
|
|
std::optional<WarningResponse> warningResponse_;
|
|
bool finishNowRequest_ = false;
|
|
|
|
//---- status updates ----
|
|
std::mutex lockCurrentStatus_; //different lock for status updates so that we're not blocked by other threads reporting errors
|
|
std::vector<std::vector<ThreadStatus>> statusByPriority_;
|
|
//give status messages priority according to their folder pair (e.g. first folder pair has prio 0) => visualize (somewhat) natural processing order
|
|
|
|
//---- status updates II (lock-free) ----
|
|
std::atomic<int> itemsDeltaProcessed_{0}; //
|
|
std::atomic<int64_t> bytesDeltaProcessed_{0}; //std:atomic is uninitialized by default!
|
|
std::atomic<int> itemsDeltaTotal_ {0}; //
|
|
std::atomic<int64_t> bytesDeltaTotal_ {0}; //
|
|
|
|
//---- aggregated numbers; accessed by main thread only ----
|
|
int itemsProcessed_ = 0;
|
|
int64_t bytesProcessed_ = 0;
|
|
};
|
|
|
|
|
|
//manage statistics reporting for a single item of work
|
|
template <class Callback>
|
|
class ItemStatReporter
|
|
{
|
|
public:
|
|
ItemStatReporter(int itemsExpected, int64_t bytesExpected, Callback& cb) :
|
|
itemsExpected_(itemsExpected),
|
|
bytesExpected_(bytesExpected),
|
|
cb_(cb) {}
|
|
|
|
~ItemStatReporter()
|
|
{
|
|
const bool scopeFail = std::uncaught_exceptions() > exeptionCount_;
|
|
if (scopeFail)
|
|
cb_.updateDataTotal(itemsReported_, bytesReported_); //=> unexpected increase of total workload
|
|
else
|
|
//update statistics to consider the real amount of data, e.g. CopyFileEx: more than the "file size" for ADS streams,
|
|
//less for sparse and compressed files, or file changed in the meantime!
|
|
cb_.updateDataTotal(itemsReported_ - itemsExpected_, bytesReported_ - bytesExpected_); //noexcept!
|
|
}
|
|
|
|
void updateStatus(std::wstring&& msg) { cb_.updateStatus(std::move(msg)); } //throw X
|
|
|
|
void logMessage(const std::wstring& msg, PhaseCallback::MsgType type) { cb_.logMessage(msg, type); } //throw X
|
|
|
|
void reportWarning(const std::wstring& msg, bool& warningActive) { cb_.reportWarning(msg, warningActive); }//throw X
|
|
|
|
void reportDelta(int itemsDelta, int64_t bytesDelta) //noexcept!
|
|
{
|
|
cb_.updateDataProcessed(itemsDelta, bytesDelta); //noexcept!
|
|
itemsReported_ += itemsDelta;
|
|
bytesReported_ += bytesDelta;
|
|
|
|
//special rule: avoid temporary statistics mess up, even though they are corrected anyway below:
|
|
if (itemsReported_ > itemsExpected_)
|
|
{
|
|
cb_.updateDataTotal(itemsReported_ - itemsExpected_, 0); //noexcept!
|
|
itemsReported_ = itemsExpected_;
|
|
}
|
|
if (bytesReported_ > bytesExpected_)
|
|
{
|
|
cb_.updateDataTotal(0, bytesReported_ - bytesExpected_); //=> everything above "bytesExpected" adds to both "processed" and "total" data
|
|
bytesReported_ = bytesExpected_;
|
|
}
|
|
}
|
|
|
|
private:
|
|
int itemsReported_ = 0;
|
|
int64_t bytesReported_ = 0;
|
|
const int itemsExpected_;
|
|
const int64_t bytesExpected_;
|
|
Callback& cb_;
|
|
const int exeptionCount_ = std::uncaught_exceptions();
|
|
};
|
|
|
|
using AsyncItemStatReporter = ItemStatReporter<AsyncCallback>;
|
|
|
|
//=====================================================================================================================
|
|
|
|
constexpr std::chrono::seconds STATUS_PERCENT_DELAY(2);
|
|
constexpr std::chrono::seconds STATUS_PERCENT_MIN_DURATION(3);
|
|
const int STATUS_PERCENT_MIN_CHANGES_PER_SEC = 2;
|
|
constexpr std::chrono::seconds STATUS_PERCENT_SPEED_WINDOW(10);
|
|
|
|
template <class Callback>
|
|
struct PercentStatReporter
|
|
{
|
|
PercentStatReporter(const std::wstring& statusMsg, int64_t bytesExpected, ItemStatReporter<Callback>& statReporter) :
|
|
msgPrefix_(statusMsg + L"... "),
|
|
bytesExpected_(bytesExpected),
|
|
statReporter_(statReporter) {}
|
|
//[!] no "updateStatus() /*throw X*/" in constructor! let caller decide
|
|
|
|
void updateDeltaAndStatus(int64_t bytesDelta) //throw X
|
|
{
|
|
statReporter_.reportDelta(0 /*itemsDelta*/, bytesDelta);
|
|
bytesCopied_ += bytesDelta;
|
|
|
|
const auto now = std::chrono::steady_clock::now();
|
|
if (now >= lastUpdate_ + UI_UPDATE_INTERVAL / 2) //every ~25 ms
|
|
{
|
|
lastUpdate_ = now;
|
|
|
|
if (!showPercent_ && bytesCopied_ > 0)
|
|
{
|
|
if (startTime_ == std::chrono::steady_clock::time_point())
|
|
{
|
|
startTime_ = now; //get higher-quality perf stats when starting timing here rather than constructor!?
|
|
speedTest_.addSample(std::chrono::seconds(0), 0 /*itemsCurrent*/, bytesCopied_);
|
|
}
|
|
else if (const std::chrono::nanoseconds elapsed = now - startTime_;
|
|
elapsed >= STATUS_PERCENT_DELAY)
|
|
{
|
|
speedTest_.addSample(elapsed, 0 /*itemsCurrent*/, bytesCopied_);
|
|
|
|
if (const std::optional<double> remSecs = speedTest_.getRemainingSec(0 /*itemsRemaining*/, bytesExpected_ - bytesCopied_))
|
|
if (*remSecs > std::chrono::duration<double>(STATUS_PERCENT_MIN_DURATION).count())
|
|
{
|
|
showPercent_ = true;
|
|
speedTest_.clear(); //discard (probably messy) numbers
|
|
}
|
|
}
|
|
}
|
|
if (showPercent_)
|
|
{
|
|
speedTest_.addSample(now - startTime_, 0 /*itemsCurrent*/, bytesCopied_);
|
|
const std::optional<double> bps = speedTest_.getBytesPerSec();
|
|
|
|
statReporter_.updateStatus(msgPrefix_ + formatPercent(std::min(static_cast<double>(bytesCopied_) / bytesExpected_, 1.0), //> 100% possible! see process_callback.h notes
|
|
bps ? *bps : 0, bytesExpected_)); //throw X
|
|
}
|
|
}
|
|
}
|
|
|
|
private:
|
|
static std::wstring formatPercent(double fraction, double bytesPerSec, int64_t bytesTotal)
|
|
{
|
|
const double totalSecs = numeric::isNull(bytesPerSec) ? 0 : bytesTotal / bytesPerSec;
|
|
const double expectedSteps = totalSecs * STATUS_PERCENT_MIN_CHANGES_PER_SEC;
|
|
|
|
const int decPlaces = [&] //TODO? protect against format flickering!?
|
|
{
|
|
if (expectedSteps <= 100) return 0;
|
|
if (expectedSteps <= 1000) return 1;
|
|
if (expectedSteps <= 10000) return 2;
|
|
if (expectedSteps <= 100000) return 3;
|
|
//return static_cast<int>(std::ceil(std::log10(expectedSteps))) - 2; -> overkill!
|
|
/**/ return 4;
|
|
}();
|
|
return zen::formatProgressPercent(fraction, decPlaces);
|
|
}
|
|
|
|
bool showPercent_ = false;
|
|
const std::wstring msgPrefix_;
|
|
const int64_t bytesExpected_;
|
|
int64_t bytesCopied_ = 0;
|
|
std::chrono::steady_clock::time_point startTime_;
|
|
std::chrono::steady_clock::time_point lastUpdate_;
|
|
SpeedTest speedTest_{STATUS_PERCENT_SPEED_WINDOW};
|
|
ItemStatReporter<Callback>& statReporter_;
|
|
};
|
|
|
|
//=====================================================================================================================
|
|
|
|
template <class Callback> inline
|
|
void reportInfo(std::wstring&& msg, Callback& cb /*throw X*/) //throw X
|
|
{
|
|
cb.logMessage(msg, PhaseCallback::MsgType::info); //throw X
|
|
cb.updateStatus(std::move(msg)); //
|
|
}
|
|
|
|
|
|
template <class Function, class Callback> inline //return ignored error message if available
|
|
std::wstring tryReportingError(Function cmd /*throw FileError*/, Callback& cb /*throw X*/) //throw X
|
|
{
|
|
for (size_t retryNumber = 0;; ++retryNumber)
|
|
try
|
|
{
|
|
cmd(); //throw FileError
|
|
return std::wstring();
|
|
}
|
|
catch (const zen::FileError& e)
|
|
{
|
|
assert(!e.toString().empty());
|
|
switch (cb.reportError({e.toString(), std::chrono::steady_clock::now(), retryNumber})) //throw X
|
|
{
|
|
case PhaseCallback::ignore:
|
|
return e.toString();
|
|
case PhaseCallback::retry:
|
|
break; //continue with loop
|
|
}
|
|
}
|
|
}
|
|
|
|
//=====================================================================================================================
|
|
struct ParallelContext
|
|
{
|
|
const AbstractPath& itemPath;
|
|
AsyncCallback& acb;
|
|
};
|
|
using ParallelWorkItem = std::function<void(ParallelContext& ctx)> /*throw ThreadStopRequest*/;
|
|
|
|
|
|
namespace
|
|
{
|
|
void massParallelExecute(const std::vector<std::pair<AbstractPath, ParallelWorkItem>>& workload,
|
|
const Zstring& threadGroupName,
|
|
PhaseCallback& callback /*throw X*/) //throw X
|
|
{
|
|
using namespace zen;
|
|
|
|
std::map<AfsDevice, std::vector<const std::pair<AbstractPath, ParallelWorkItem>*>> perDeviceWorkload;
|
|
for (const auto& item : workload)
|
|
perDeviceWorkload[item.first.afsDevice].push_back(&item);
|
|
|
|
if (perDeviceWorkload.empty())
|
|
return; //[!] otherwise AsyncCallback::notifyAllDone() is never called!
|
|
|
|
AsyncCallback acb; //manage life time: enclose ThreadGroup's!!!
|
|
std::atomic<size_t> activeDeviceCount(perDeviceWorkload.size()); //
|
|
|
|
//---------------------------------------------------------------------------------------------------------
|
|
std::vector<ThreadGroup<std::function<void()>>> deviceThreadGroups; //worker threads live here...
|
|
//---------------------------------------------------------------------------------------------------------
|
|
|
|
for (const auto& [afsDevice, wl] : perDeviceWorkload)
|
|
{
|
|
const size_t statusPrio = deviceThreadGroups.size();
|
|
|
|
const Zstring& deviceGroupName = threadGroupName + Zstr(' ') + utfTo<Zstring>(AFS::getDisplayPath(AbstractPath(afsDevice, AfsPath())));
|
|
deviceThreadGroups.emplace_back(1, deviceGroupName);
|
|
auto& threadGroup = deviceThreadGroups.back();
|
|
|
|
for (const std::pair<AbstractPath, ParallelWorkItem>* item : wl)
|
|
threadGroup.run([&acb, statusPrio, &itemPath = item->first, &task = item->second]
|
|
{
|
|
acb.notifyTaskBegin(statusPrio);
|
|
ZEN_ON_SCOPE_EXIT(acb.notifyTaskEnd());
|
|
|
|
ParallelContext pctx{itemPath, acb};
|
|
task(pctx); //throw ThreadStopRequest
|
|
});
|
|
|
|
threadGroup.notifyWhenDone([&acb, &activeDeviceCount] /*noexcept! runs on worker thread!*/
|
|
{
|
|
if (--activeDeviceCount == 0)
|
|
acb.notifyAllDone(); //noexcept
|
|
});
|
|
}
|
|
|
|
acb.waitUntilDone(UI_UPDATE_INTERVAL / 2 /*every ~25 ms*/, callback); //throw X
|
|
}
|
|
}
|
|
|
|
//=====================================================================================================================
|
|
|
|
template <class Function> inline
|
|
auto parallelScope(Function&& fun, std::mutex& singleThread) //throw X
|
|
{
|
|
singleThread.unlock();
|
|
ZEN_ON_SCOPE_EXIT(singleThread.lock());
|
|
|
|
return fun(); //throw X
|
|
}
|
|
}
|
|
|
|
#endif //STATUS_HANDLER_IMPL_H_07682758976
|