Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
846 views
in Technique[技术] by (71.8m points)

multithreading - Reusing thread in loop c++

I need to parallelize some tasks in a C++ program and am completely new to parallel programming. I've made some progress through internet searches so far, but am a bit stuck now. I'd like to reuse some threads in a loop, but clearly don't know how to do what I'm trying for.

I am acquiring data from two ADC cards on the computer (acquired in parallel), then I need to perform some operations on the collected data (processed in parallel) while collecting the next batch of data. Here is some pseudocode to illustrate

//Acquire some data, wait for all the data to be acquired before proceeding
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a);
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a);
acq1.join();
acq2.join();

while(user doesn't interrupt)
{

//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1(AcquireData, boardHandle1, memoryAddress1b);
acq2(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/
}

That's the main gist of it. The next run of the loop would write to the "a" memory addresses while processing the "b" data and continue to alternate (I can get the code to do that, just took it out to prevent cluttering up the problem).

Anyway, the problem (as I'm sure some people can already tell) is that the second time I try to use acq1 and acq2, the compiler (VS2012) says "IntelliSense: call of an object of a class type without appropriate operator() or conversion functions to pointer-to-function type". Likewise, if I put std::thread in front of acq1 and acq2 again, it says " error C2374: 'acq1' : redefinition; multiple initialization".

So the question is, can I reassign threads to a new task when they have completed their previous task? I always wait for the previous use of the thread to end before calling it again, but I don't know how to reassign the thread, and since it's in a loop, I can't make a new thread each time (or if I could, that seems wasteful and unnecessary, but I could be mistaken).

Thanks in advance

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The easiest way is to use a waitable queue of std::function objects. Like this:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>


class ThreadPool
{
    public:

    ThreadPool (int threads) : shutdown_ (false)
    {
        // Create the specified number of threads
        threads_.reserve (threads);
        for (int i = 0; i < threads; ++i)
            threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i));
    }

    ~ThreadPool ()
    {
        {
            // Unblock any threads and tell them to stop
            std::unique_lock <std::mutex> l (lock_);

            shutdown_ = true;
            condVar_.notify_all();
        }

        // Wait for all threads to stop
        std::cerr << "Joining threads" << std::endl;
        for (auto& thread : threads_)
            thread.join();
    }

    void doJob (std::function <void (void)> func)
    {
        // Place a job on the queu and unblock a thread
        std::unique_lock <std::mutex> l (lock_);

        jobs_.emplace (std::move (func));
        condVar_.notify_one();
    }

    protected:

    void threadEntry (int i)
    {
        std::function <void (void)> job;

        while (1)
        {
            {
                std::unique_lock <std::mutex> l (lock_);

                while (! shutdown_ && jobs_.empty())
                    condVar_.wait (l);

                if (jobs_.empty ())
                {
                    // No jobs to do and we are shutting down
                    std::cerr << "Thread " << i << " terminates" << std::endl;
                    return;
                 }

                std::cerr << "Thread " << i << " does a job" << std::endl;
                job = std::move (jobs_.front ());
                jobs_.pop();
            }

            // Do the job without holding any locks
            job ();
        }

    }

    std::mutex lock_;
    std::condition_variable condVar_;
    bool shutdown_;
    std::queue <std::function <void (void)>> jobs_;
    std::vector <std::thread> threads_;
};

void silly (int n)
{
    // A silly job for demonstration purposes
    std::cerr << "Sleeping for " << n << " seconds" << std::endl;
    std::this_thread::sleep_for (std::chrono::seconds (n));
}

int main()
{
    // Create two threads
    ThreadPool p (2);

    // Assign them 4 jobs
    p.doJob (std::bind (silly, 1));
    p.doJob (std::bind (silly, 2));
    p.doJob (std::bind (silly, 3));
    p.doJob (std::bind (silly, 4));
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...