View previous topic :: View next topic |
Author |
Message |
Ronan
Joined: 15 May 2011 Posts: 7
|
Posted: Sun May 15, 2011 5:04 Post subject: Non-blocking NWNXSetString("SQL", "EXEC" |
|
|
Problem:
Our servers share a common MySQL instance, and we write to our logs a lot. Like every creature death. Since calls to the above function appear synchronous, this creates lag.
I was thinking I'd create a MySQL::AsyncExecute which schedules a MySQL::Execute call in another thread, then exits. However my C++ is extremely rusty, so I thought I'd ask here if anyone has had this problem and found a solution?
Thanks for any help. |
|
Back to top |
|
|
Skywing
Joined: 03 Jan 2008 Posts: 321
|
Posted: Sun May 15, 2011 19:36 Post subject: |
|
|
Could you instead buffer your log writes up until you some upper time limit has elapsed or you have a large number of log writes, and then write them out as one large query (or a series of multiple queries glued together in the same query string)? This would collapse the round trip time for issuing queries to just one per query batch instead of one per query.
This seems like it would be better performance-wise anyway, and it avoids you having the headache of how to manage communication with NWScript from another thread other than the server main thread. |
|
Back to top |
|
|
MaxRock
Joined: 24 Jan 2008 Posts: 196
|
Posted: Sun May 15, 2011 20:14 Post subject: |
|
|
For data like logs which isn't queried again it should actually work...
CMyLog.h
Code: |
#pragma once
#include <deque>
#include <windows.h>
#include <fstream>
#include <sstream>
#include <iomanip>
#pragma warning(disable : 4996)
namespace MyLog {
class CMessage {
public:
CMessage(char *sMsg) {
bProcessed = 0;
Msg = new char[strlen(sMsg)+1];
strcpy(Msg, sMsg);
}
~CMessage() {
delete[] Msg;
}
int bProcessed;
char *Msg;
};
class CMyLogQue {
public:
CMyLogQue(DWORD nSleepTime, DWORD dwCreationFlags, char *Path);
~CMyLogQue();
private:
HANDLE hThread;
DWORD thID;
int bStopThread;
int bThreadRunning;
int bLock;
DWORD SleepTime;
std::deque<CMessage*> MessageQue;
std::deque<CMessage*> TempQue;
void ClearProcessedItems();
DWORD CMyLogQue::WriteData();
void CMyLogQue::WriteMsg(char *input);
std::ofstream logfile;
static DWORD WINAPI ThreadWrapper(void *PTHIS) {
CMyLogQue *pTHIS = (CMyLogQue*)PTHIS;
return pTHIS->WriteData();
}
public:
void AddData(char *sMsg);
void StartThread() {
if (!bThreadRunning && hThread) {
ResumeThread(hThread);
}
}
void bKillThread() {
if (bThreadRunning) {
bStopThread = 1;
}
}
int GetIsThreadRunning() {
return bThreadRunning;
}
};
}
|
CMyLog.cpp
Code: |
#include "StdAfx.h"
#include "CMyLog.h"
namespace MyLog {
CMyLogQue::CMyLogQue(DWORD nSleepTime, DWORD dwCreationFlags, char *Path) {
bStopThread = 0;
SleepTime = nSleepTime;
logfile.open(Path);
logfile.close();
logfile.open(Path, std::fstream::in | std::fstream::app);
//create worker thread here
hThread = CreateThread(NULL, 0, ThreadWrapper, this, dwCreationFlags, &thID);
if (dwCreationFlags == CREATE_SUSPENDED)
bThreadRunning = 0;
else
bThreadRunning = 1;
bLock=0;
}
CMyLogQue::~CMyLogQue() {
// kill our thread
TerminateThread(hThread, 0);
//write out data from the ques
std::deque<CMessage*>::iterator it = MessageQue.begin();
while (it != MessageQue.end()) {
if (!(*it)->bProcessed) {
WriteMsg((*it)->Msg);
(*it)->bProcessed = 1;
}
++it;
}
it = TempQue.begin();
while (it != TempQue.end()) {
WriteMsg((*it)->Msg);
++it;
}
}
// main worker thread
DWORD CMyLogQue::WriteData() {
while (!bStopThread) {
if (!bLock) {
bLock = 1; // no changes to the que
std::deque<CMessage*>::iterator it = MessageQue.begin();
while (it != MessageQue.end()) {
if (!(*it)->bProcessed) {
WriteMsg((*it)->Msg);
(*it)->bProcessed = 1; // mark this message as processed
}
++it;
}
bLock = 0; // allow changes to the que
}
Sleep(SleepTime); // put thread to sleep
}
bThreadRunning = 0;
return 1;
}
//
void CMyLogQue::AddData(char *sMsg) {
CMessage *Message = new CMessage(sMsg);
if (!bLock) {
bLock = 1;
ClearProcessedItems(); // remove already processed items from the que
MessageQue.insert(MessageQue.end(), TempQue.begin(), TempQue.end()); // add our temporary que to the main one
TempQue.clear();
MessageQue.push_back(Message); // push new message on the que
bLock = 0;
}
else { // if the main que is being processed by the worker thread, add message to the temp que
TempQue.push_back(Message);
}
}
void CMyLogQue::ClearProcessedItems() {
std::deque<CMessage*>::iterator it = MessageQue.begin();
while (it != MessageQue.end() && (*it)->bProcessed == 1) {
MessageQue.pop_front();
it = MessageQue.begin();
}
}
void CMyLogQue::WriteMsg(char *input) {
std::stringstream sDate;
std::stringstream sOut;
SYSTEMTIME stTime;
GetLocalTime(&stTime);
sDate << "[" << std::setfill('0') << stTime.wYear << "-" << std::setw(2) << stTime.wMonth << "-" << std::setw(2) << stTime.wDay << " " << std::setw(2) << stTime.wHour << ":" << std::setw(2) << stTime.wMinute << ":" << std::setw(2) << stTime.wSecond << "] ";
sOut << sDate << input;
logfile << sOut.str();
logfile.flush();
}
}
|
It should be possible to adapt this to call mysql instead of writing data to a log file.
Question of course is would it be worth it with the context switching from the thread and manipulating the deque which does not happen in another thread. |
|
Back to top |
|
|
Skywing
Joined: 03 Jan 2008 Posts: 321
|
Posted: Sun May 15, 2011 20:22 Post subject: |
|
|
If you perform synchronous database operations with the same database connection as the main thread uses, I don't think you'll see much of a benefit here. Other database systems (e.g. postgres/Oracle) don't allow multiple synchronous operations on the same database connection context at least; I suspect MySQL doesn't either. Unless it is well documented and defined in MySQL what the behavior is when issuing synchronous operations on a DB connection while asynchronous operations are pending on the same connection, it'll probably be necessary to maintain another connection.
Note that you shouldn't be using TerminateThread. This has a chance to hang the process when your destructor runs based on your code (for example, it could kill the thread while it is holding the process heap lock). Instead, you must cleanly shut down the worker thread by signalling it to exit gracefully (perhaps by setting an event that the thread waits on). |
|
Back to top |
|
|
MaxRock
Joined: 24 Jan 2008 Posts: 196
|
Posted: Sun May 15, 2011 20:57 Post subject: |
|
|
Skywing wrote: | If you perform synchronous database operations with the same database connection as the main thread uses, I don't think you'll see much of a benefit here. Other database systems (e.g. postgres/Oracle) don't allow multiple synchronous operations on the same database connection context at least; I suspect MySQL doesn't either. Unless it is well documented and defined in MySQL what the behavior is when issuing synchronous operations on a DB connection while asynchronous operations are pending on the same connection, it'll probably be necessary to maintain another connection. |
You're right. I actually ripped this out of a plugin which creates another mysql connection besides the nwnx_odbc one, even though the code posted doesn't actually do any mysql calls.
Skywing wrote: |
Note that you shouldn't be using TerminateThread. This has a chance to hang the process when your destructor runs based on your code (for example, it could kill the thread while it is holding the process heap lock). Instead, you must cleanly shut down the worker thread by signalling it to exit gracefully (perhaps by setting an event that the thread waits on). |
How about setting bStopThread = 1; in which case Sleep isn't called and the function just exits. I think the problem was that the whole process ended and the worker thread never ended...
Anyways... You're suggestion of simply collecting all the log messages up to a certain point and then make one call to mysql would be better in any case. Less overhead and no context switching. |
|
Back to top |
|
|
virusman
Joined: 30 Jan 2005 Posts: 1020 Location: Russia
|
Posted: Sun May 15, 2011 21:14 Post subject: |
|
|
Try using INSERT DELAYED. _________________ In Soviet Russia, NWN plays you! |
|
Back to top |
|
|
Skywing
Joined: 03 Jan 2008 Posts: 321
|
Posted: Sun May 15, 2011 21:31 Post subject: |
|
|
Yes, I would try and coalesce the writes as my first try here.
For reference, this is how I would rewrite the add data / thread procedure to be safer. Note that the usage of bLock is also not safe (under CL) as is without declaring it volatile, as plain reads/writes don't have acquire/release semantics and so manipulations of the locked data structures could pass ahead of the tests and updates to bLock. It's best to use a real lock here than a volatile spinlock as a good practice. (I don't know what gcc's guarantees about memory model consistency with volatile are, using a real lock gets one out of this sort of problem.)
Code: | // assume CRITICAL_SECTION CritSec in class definition, initialized with InitializeCriticalSection and deleted with DeleteCriticalSection
// assume HANDLE Event in class definition, initialized with CreateEvent( NULL, FALSE, FALSE, NULL )
// assume bStopThread is qualified as volatile (required so that the write is made visible to the worker thread before SetEvent)
void CMyLogQueue::AddData(char *sMsg) {
//
// Note, take care to perform allocations that may throw while under a lock.
//
CMessage * Msg = new CMessage( sMsg );
EnterCriticalSection( &CritSec );
try
{
MessageQueue.insert( Msg );
}
catch (std::exception)
{
//
// We exhausted memory adding it to the queue, delete the new message
// context and handle the failure.
//
delete Msg;
Msg = NULL;
}
LeaveCriticalSection( &CritSec );
if (Msg != NULL) {
//
// Wake the worker thread if we actually added a message.
//
SetEvent( Event );
}
}
DWORD CMyLogQueue::WriteData() {
for (;;) {
//
// Wait for a signal to quit or a signal that data has been placed in the
// queue.
//
WaitForSingleObject( Event, INFINITE );
//
// Note, bStopThread must be volatile. Otherwise a race exists where the
// write to bStopThread in the dtor may be observed by this thread after
// the event is set. This means we would never shutdown the thread.
//
if (bStopThread) {
break;
}
EnterCriticalSection( &CritSec );
while (!MessageQueue.empty( ))
{
CMessage * Msg = MessageQueue.pop_back( );
//
// Carefully drop the lock across the writes which may be slow, so
// that we do not block callers across the slow write which is the
// purpose of the asynchronous queue. Note that because bStopThread
// only transitions from false to true, and because we do not reset
// the event except by issuing the wait on it at the top of the loop,
// there is no danger that notifications may be lost.
//
// An alternative implementation would make a local copy of the queue
// contents instead of grabbing the lock for every removal, but this
// must deal with resource exhaustion during the copy.
//
LeaveCriticalSection( &CritSec );
WriteMsg( Msg->Msg );
delete Msg;
EnterCriticalSection( &CritSec );
}
LeaveCriticalSection( &CritSec );
}
return 0;
}
CMyLogQueue::~CMyLogQueue()
{
//
// Note, this destructor must not be invoked from a callstack that involves
// a DllMain callout (i.e. a global variable in a DLL). This is because the
// worker thread will block on the loader lock while trying to issue a
// DLL_PROCESS_DETACH callout while we hold the loader lock waiting for the
// thread to quit. If we're not under DllMain there is no such problem.
//
bStopThread = true;
SetEvent( Event );
WaitForSingleObject( hThread, INFINITE );
CloseHandle( hThread );
CloseHandle( Event );
DeleteCriticalSection( &CritSec );
//
// Note, any lingering queue entries must be deleted here.
//
} |
Last edited by Skywing on Sun May 15, 2011 21:37; edited 3 times in total |
|
Back to top |
|
|
Skywing
Joined: 03 Jan 2008 Posts: 321
|
Posted: Sun May 15, 2011 21:32 Post subject: |
|
|
virusman wrote: | Try using INSERT DELAYED. |
Does this still involve a round trip to the database server or is it entirely processed locally? If it still talks to the database server, we don't solve the problem of hiding the server latency here. |
|
Back to top |
|
|
virusman
Joined: 30 Jan 2005 Posts: 1020 Location: Russia
|
Posted: Sun May 15, 2011 21:56 Post subject: |
|
|
With INSERT DELAYED, the server responds immediately, without doing actual I/O: it can significantly reduce lock time. _________________ In Soviet Russia, NWN plays you! |
|
Back to top |
|
|
Skywing
Joined: 03 Jan 2008 Posts: 321
|
Posted: Sun May 15, 2011 22:25 Post subject: |
|
|
Sounds like it would be useful to combine with the write coaleascing (which amortizes the round trip costs). |
|
Back to top |
|
|
Ronan
Joined: 15 May 2011 Posts: 7
|
Posted: Mon May 16, 2011 2:23 Post subject: |
|
|
Thanks for the advice. I did just implement a FIFO queue in nwscript, so I'll store the writes on that and pop a few of them out at set intervals.
I am really spoiled by the actor frameworks in other languages and would prefer not to mess with threading if possible.
In order to use INSERT DELAYED to its fullest, I'm guessing I'd have to create a new function with the following removed:
Code: | // eat any leftover resultsets so mysql does not get out of sync
while (mysql_more_results(&mysql))
{
mysql_next_result(&mysql);
newResult = mysql_store_result(&mysql);
mysql_free_result(newResult);
} |
(since we don't care about responses)
and:
Code: | // store the resultset in local memory
newResult = mysql_store_result(&mysql);
if (newResult == NULL)
{
...
{
// successfully retrieved the resultset
mysql_free_result(result);
result = newResult;
row = NULL;
num_fields = mysql_num_fields(result);
} |
I can't find any way to tell MySQL not to return anything in response to INSERT queries, so I'm guessing this with INSERT DELAYED would still return a boolean. But we can't ignore that because other functions in the plugin look like they pop unretrieved resultsets from the queue? |
|
Back to top |
|
|
Skywing
Joined: 03 Jan 2008 Posts: 321
|
Posted: Mon May 16, 2011 2:59 Post subject: |
|
|
Are you combining the queries into one query when popping them out, or just issuing them back to back? You'll need to combine them into one query to gain the benefits of hiding server to database latency; otherwise you still have a full round trip to the database with every query, which is likely a significant contributor to execution time if the server isn't local. Just spreading the queries out doesn't reduce the amount of work done unlike combining the queries (if that is what you were doing), although it helps with not blocking the while nwn2server main thread for such a long duration at once.
I believe that you must still consume the resultset even when using INSERT DELAYED. (i.e. you wouldn't want to write a function to not consume the resultsets or you'll likely get an out of sync error, or so the MySQL manual indicates). |
|
Back to top |
|
|
Ronan
Joined: 15 May 2011 Posts: 7
|
Posted: Mon May 16, 2011 4:35 Post subject: |
|
|
Skywing, right now I'm just queuing them up so they won't all be sent at once, but yes I was planning on combining groups of them into one.
If we have to retrieve the result sets after each INSERT, then it seems like the DELAYED keyword probably won't help very much? I'm thinking the biggest issue here is network latency, not the response time of the db. |
|
Back to top |
|
|
Skywing
Joined: 03 Jan 2008 Posts: 321
|
Posted: Mon May 16, 2011 6:52 Post subject: |
|
|
Ronan wrote: | Skywing, right now I'm just queuing them up so they won't all be sent at once, but yes I was planning on combining groups of them into one.
If we have to retrieve the result sets after each INSERT, then it seems like the DELAYED keyword probably won't help very much? I'm thinking the biggest issue here is network latency, not the response time of the db. |
My understanding is that INSERT DELAYED would help avoid blocking for locks in the database if there were concurrent writers, and helps avoid blocking on database-side disk I/O opportunistically. It appears to still require a server response for each INSERT DELAYED query. Combining the queries into a multiply batched query (or batching them within a single query if they are all inserts into the same table) will mitigate round trip time (network) latency.
It looks like it might be necessary to make some very minor changes to the MySQL plugin to enable multiple query support (i.e. semicolon delimited queries) if it doesn't already set the server option (don't have the source in front of me to check). |
|
Back to top |
|
|
Ronan
Joined: 15 May 2011 Posts: 7
|
Posted: Mon Jun 20, 2011 17:07 Post subject: |
|
|
Skywing,
Adding semicolons in between INSERTS and combining them into a single string worked fine for me. The result was huge drops in the time spent in log-writing scripts, according to the profiler. |
|
Back to top |
|
|
|