Jump to content

Sending messages from worker thread


Recommended Posts

As I've already noticed, components use message systems to communicate with each other.

Message is handled in that same thread it has been send from.

I am trying to make async pathfinding requests being processed in worker thread.

The problem is that pathfinder returns path results as messages, and therefore I need to send a message from worker thread and have it handled in main thread(otherwise strange things can happen.)

I came up with a solution to create a global array of messages and send them in main loop of the game.

I've tried to create a global array of messages and a loop in Frame() function to process them, but there are two problems:

1. If you want to push data to queue located in main thread from worker thread, then you would need to lock main thread. Is it safe? And how to get mutex class of main thread?

2. How to send message from Frame() function? I don't know how to do it from function outside component class (in Frame() function i can't call GetSimState().GetComponentManager() ).

I've created class CCmpPathfinderWorker, which is going to handle worker thread for pathfinding.

1. :

// imagine this function is executed in worker thread in CCmpPathfinderWorker class

void CCmpPathfinder::ProcessShortRequests(const std::vector<AsyncShortPathRequest>& shortRequests)
{
    for (size_t i = 0; i < shortRequests.size(); ++i)
    {
        WaypointPath path;
        ControlGroupMovementObstructionFilter filter(shortRequests[i].avoidMovingUnits, shortRequests[i].group);
        ComputeShortPath(filter, shortRequests[i].x0, shortRequests[i].z0, shortRequests[i].clearance,
                         shortRequests[i].range, shortRequests[i].goal, shortRequests[i].passClass, path);
        CMessagePathResult msg(shortRequests[i].ticket, path);
        m_Parent->GetSimContext().GetComponentManager().PostMessage(shortRequests[i].notify, msg); // <-- here is the problem
        // m_Parent is a pointer to CCmpPathfinder class instance that is hosting this CCmpPathfinderWorker instance
    }
}

2. :

This is what I wanted to do:

// somewhere is Frame() function in main.cpp

//.....

for(std::pair<entity_id_t, CMessage> i : some_global_messages_queue)
{
    SomeMagicFunctionThatWouldMakeItWork().PostMessage(i.first, i.second);
}

//.....

 

Because "SomeMagicFunctionThatWouldMakeItWork()" sadly :( doesn't exists this is probably wrong way to do it. So how would be correct?

I guess this is not implemented* because no one before needed to send messages from thread to thread (not sure).

I could implement it if it isn't. But would be nice if you share some of your knowledge about this problem with me.

EDIT:

Maybe we should implement this as a component? If it isn't already implemented somehow. I will try do something with it tomorrow.

 

* - I guess there is no implemented way to send messages from worker thread and process it on main thread.

Edited by Kuba386
Link to comment
Share on other sites

Passing events one-by-one isn't a good idea, because it significantly decrease performance. At least they should be processed by enough big blocks. But, I suppose we don't need all these event messages on the pathfinder thread side at all. You only need a graph-data and requests. So it's possible to add an interface of the pathfinder worker thread to the main thread. This interface will aggregate all only needed events, convert them into a more compact data and pass to the worker thread. Then it'll receive batches of answers and convert to messages.

I didn't look much at our pathfinder code, so I can be wrong somewhere. I think @wraitii could help much more here.

 

  • Like 1
  • Thanks 1
Link to comment
Share on other sites

12 hours ago, vladislavbelov said:

Passing events one-by-one isn't a good idea, because it significantly decrease performance.

Well, 'passing events one-by-one' is exactly what is 0ad code actually doing.

void CComponentManager::PostMessage(entity_id_t ent, const CMessage& msg)
{
	PROFILE2_IFSPIKE("Post Message", 0.0005);
	PROFILE2_ATTR("%s", msg.GetScriptHandlerName());
	// Send the message to components of ent, that subscribed locally to this message
	std::map<MessageTypeId, std::vector<ComponentTypeId> >::const_iterator it;
	it = m_LocalMessageSubscriptions.find(msg.GetType());
	if (it != m_LocalMessageSubscriptions.end())
	{
		std::vector<ComponentTypeId>::const_iterator ctit = it->second.begin();
		for (; ctit != it->second.end(); ++ctit)
		{
			// Find the component instances of this type (if any)
			std::map<ComponentTypeId, std::map<entity_id_t, IComponent*> >::const_iterator emap = m_ComponentsByTypeId.find(*ctit);
			if (emap == m_ComponentsByTypeId.end())
				continue;

			// Send the message to all of them
			std::map<entity_id_t, IComponent*>::const_iterator eit = emap->second.find(ent);
			if (eit != emap->second.end())
				eit->second->HandleMessage(msg, false); // look, just calling HandleMessage() function.  
          // Main threads is actually calling HandleMessage() of all receivers. One-by-one. 
          // When you send a message you have to wait for all of your receivers to process it. Correct me if I'm wrong.
		}
	}

	SendGlobalMessage(ent, msg);
}

 

12 hours ago, vladislavbelov said:

But, I suppose we don't need all these event messages on the pathfinder thread side at all.

How would you notice CCmpUnitMotion class that async pathfinding is done and pass result?

 

12 hours ago, vladislavbelov said:

So it's possible to add an interface of the pathfinder worker thread to the main thread.

Actually all these messages are interface, because HandleMessage() function is interface function.

But if you just call interface function from worker thread it will execute in worker thread(If it won't, if it will somehow execute in main thread, then this topic is quite pointless).

12 hours ago, vladislavbelov said:

interface of the pathfinder worker thread to the main thread

Interface of pathfinder worker thread in main thread. This way pathfinder thread couldn't tell main thread about finished pathfinging. Main thread would have to check if worker thread already done it. This means it would have to check if pathfinder already finished computing path.

This actually seems to be not that bad solution. CCmpPathfinderWorker would have it's local 'result queue' and main thread would lock pathfinder worker thread and check if there is something in that queue. But when would it check that?(where in code?) Main advantage of PostMessage() function is that receiver is getting your message instantly. With that queue it wouldn't be instantly.

 

12 hours ago, vladislavbelov said:

I didn't look much at our pathfinder code, so I can be wrong somewhere.

Then maybe it's not you who are all these questions to. Anyway thanks for your answer. :)

 

EDIT:

Another thing just came to my mind.

When we send message about path result to CCmpUnitMotion, then it only does some updates.

Whet would happen if we just lock main thread, send message, have it processed in worker thread, do all updates to CCmpUnitMotion in worker thread, and then unlock main thread and continue work?

Only thing that I'm not sure of is what would happen if we lock main thread, and can we even do that? Won't it result in a deadlock or something?

Edited by Kuba386
  • Like 1
Link to comment
Share on other sites

> Well, 'passing events one-by-one' is exactly what is 0ad code actually doing.

Well, PostMessage sends game events with the relevant data, such as a unit dying, ownership changing or whatever to other componenets. Not necessarily for communicating all kinds of data.

Edited by Guest
Link to comment
Share on other sites

3 hours ago, Imarok said:

Already seen D14?

No, I haven't seen it before. Will read it soon.

 

But now I have other problem.

I've added some code, it passes compilation phase, but there are errors when linking pyrogenesis (undefined references to things I have added to ThreadUtil.h)

==== Building pyrogenesis (release) ====
main.cpp
Linking pyrogenesis
/usr/bin/ld: ../../../binaries/system/libsimulation2.a(CCmpPathfinder.o):(.data.rel.ro._ZTV14CCmpPathfinder[_ZTV14CCmpPathfinder]+0xc0): undefined reference to `CCmpPathfinder::ComputeShortPath(IObstructionTestFilter const&, CFixed<int, 2147483647, 32, 15, 16, 65536>, CFixed<int, 2147483647, 32, 15, 16, 65536>, CFixed<int, 2147483647, 32, 15, 16, 65536>, CFixed<int, 2147483647, 32, 15, 16, 65536>, PathGoal const&, unsigned short, WaypointPath&)'
/usr/bin/ld: ../../../binaries/system/libsimulation2.a(CCmpPathfinder_Worker.o): in function `CCmpPathfinderWorker::ProcessLongRequests(std::vector<AsyncLongPathRequest, std::allocator<AsyncLongPathRequest> > const&)':
/home/kuba/0ad-source/0ad/build/workspaces/gcc/../../../source/simulation2/components/CCmpPathfinder_Worker.cpp:676: undefined reference to `ThreadUtil::g_MainThreadMutex'
/usr/bin/ld: /home/kuba/0ad-source/0ad/build/workspaces/gcc/../../../source/simulation2/components/CCmpPathfinder_Worker.cpp:678: undefined reference to `ThreadUtil::g_MainThreadMutex'
/usr/bin/ld: ../../../binaries/system/libsimulation2.a(CCmpPathfinder_Worker.o): in function `CCmpPathfinderWorker::ProcessShortRequests(std::vector<AsyncShortPathRequest, std::allocator<AsyncShortPathRequest> > const&)':
/home/kuba/0ad-source/0ad/build/workspaces/gcc/../../../source/simulation2/components/CCmpPathfinder_Worker.cpp:695: undefined reference to `ThreadUtil::g_MainThreadMutex'
/usr/bin/ld: /home/kuba/0ad-source/0ad/build/workspaces/gcc/../../../source/simulation2/components/CCmpPathfinder_Worker.cpp:697: undefined reference to `ThreadUtil::g_MainThreadMutex'
/usr/bin/ld: ../../../binaries/system/libsimulation2.a(CCmpPathfinder_Vertex.o): in function `CCmpPathfinderWorker::ComputeShortPath(IObstructionTestFilter const&, CFixed<int, 2147483647, 32, 15, 16, 65536>, CFixed<int, 2147483647, 32, 15, 16, 65536>, CFixed<int, 2147483647, 32, 15, 16, 65536>, CFixed<int, 2147483647, 32, 15, 16, 65536>, PathGoal const&, unsigned short, WaypointPath&)':
/home/kuba/0ad-source/0ad/build/workspaces/gcc/../../../source/simulation2/components/CCmpPathfinder_Vertex.cpp:582: undefined reference to `ThreadUtil::g_MainThreadMutex'
/usr/bin/ld: ../../../binaries/system/libengine.a(GameSetup.o):/home/kuba/0ad-source/0ad/build/workspaces/gcc/../../../source/ps/GameSetup/GameSetup.cpp:873: more undefined references to `ThreadUtil::g_MainThreadMutex' follow
collect2: error: ld returned 1 exit status
make[1]: *** [pyrogenesis.make:85: ../../../binaries/system/pyrogenesis] Error 1
make: *** [Makefile:69: pyrogenesis] Error 2

Any idea what can I do with it?

Edited by Kuba386
Link to comment
Share on other sites

1 hour ago, stanislas69 said:

Try cleaning and updating workspaces again

I have already done that, but it didn't help.

The problem is however solved. There was a function declared, but not defined, and incorrectly used extern variable.

Next problem is that game deadlocks when map is loaded. I have some ideas how to solve it tough.

Link to comment
Share on other sites

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

 Share

×
×
  • Create New...