/* * It turns out that for bulk inserts, for MySQL the "LOAD DATA" statement is * by far the fastest method. In tests, it was about 5 times faster than * INSERTs with multiple VALUE lists. * Problem with the "LOAD DATA" method is that data needs to come from a file. * This is solved by creating a temporary file, writing formatted data to that file * and then loading the file using a LOAD DATA. In order to avoid taking up too much * disk space, the LOAD DATA is done when the file reaches a given size. After it * has been loaded, the file is truncated and filled again. */ #if defined(_WIN32) || defined(WITH_MYSQL) #if defined(_WIN32) #include #else #include #endif #include #include #include #include #include #include #include #include #include "dbmysql.h" #include "iso.h" #include "ripdb.h" #include "util.h" #define MAX_ERROR_LEN 254 #define MAX_TEMP_FILE_SIZE (1024 * 1024) typedef struct { unsigned nCol; void *pVar; unsigned maxlen; bool *pbIsNull; } Output; class MySqlCursor : public RipCursor { public: MySqlCursor(MySqlDb *database); ~MySqlCursor(); virtual void PrepareInsert(char *pszTable, unsigned nFields, char **ppszFieldNames, iso_char **ppszValues, unsigned *pnMaxLengths); virtual void PrepareSelect(char *pszStatement); virtual void PrepareAction(char *pszStatement); virtual void DefineOutputColumn(unsigned nCol, char *pszVar, unsigned maxlen, bool *pbIsNull); virtual void Execute(); virtual bool Fetch(); protected: void GetResults(); void ProcessOutput(Output *output, MYSQL_ROW row, unsigned long *lengths); void Connect(); void LoadData(); void HandleError(char *msg); MYSQL *GetMySql(); char *Expand(); MYSQL *m_mysql; MySqlDb *m_pDatabase; MYSQL_RES *m_resultset; unsigned m_nOutputCount; Output *m_pOutput; unsigned m_nFieldCount; char **m_ppszValues; FILE *m_fData; char *m_pszDataName; unsigned long m_lTempSize; }; MySqlCursor::MySqlCursor(MySqlDb *pDatabase) : RipCursor(), m_mysql(NULL), m_pDatabase(pDatabase), m_resultset(NULL), m_nOutputCount(0), m_pOutput(NULL), m_ppszValues(NULL), m_fData(NULL), m_pszDataName(NULL) { } MySqlCursor::~MySqlCursor() { if (NULL != m_fData) { fclose(m_fData); LoadData(); } if (NULL != m_pszDataName) { unlink(m_pszDataName); free(m_pszDataName); m_pszDataName = NULL; } if (NULL != m_resultset) { mysql_free_result(m_resultset); } if (NULL != m_ppszValues) { delete [] m_ppszValues; } if (NULL != m_pOutput) { delete [] m_pOutput; } if (NULL == m_mysql) { m_pDatabase->m_pActiveCursor = NULL; } else { mysql_close(m_mysql); } } void MySqlCursor::PrepareInsert(char *pszTable, unsigned nFields, char **ppszFieldNames, iso_char **ppszValues, unsigned *pnMaxLengths) { RipCursor::PrepareInsert(pszTable, nFields, ppszFieldNames, ppszValues, pnMaxLengths); m_ppszValues = new char *[nFields]; m_nFieldCount = nFields; memcpy(m_ppszValues, ppszValues, m_nFieldCount * sizeof(char *)); try { /* Generate a name for the data file */ m_pszDataName = tempnam(".", "fr"); if (NULL == m_pszDataName) { ThrowException(false, "Unable to generate temporary filename"); } /* Create the data file */ m_fData = fopen(m_pszDataName, "wb"); if (NULL == m_fData) { ThrowException(true, "Unable to open temp file %s: ", m_pszDataName); } m_lTempSize = 0; /* Construct the LOAD DATA statement */ unsigned nSize = 39 + m_nFieldCount + strlen(m_pszDataName) + strlen(pszTable); unsigned nField; for (nField = 0; nField < m_nFieldCount; nField++) { nSize += strlen(ppszFieldNames[nField]); } char *p = m_pszDataName; while ('\0' != *p) { if ('\\' == *p) { nSize++; } p++; } m_pszStatement = new char[nSize + 1]; strcpy(m_pszStatement, "LOAD DATA LOCAL INFILE '"); p = m_pszDataName; char *q = m_pszStatement + strlen(m_pszStatement); while ('\0' != *p) { if ('\\' == *p) { *q++ = '\\'; } *q++ = *p++; } *q = '\0'; strcat(strcat(strcat(m_pszStatement, "' INTO TABLE "), pszTable), " ("); for (nField = 0; nField < m_nFieldCount; nField++) { if (0 != nField) { strcat(m_pszStatement, ","); } strcat(m_pszStatement, ppszFieldNames[nField]); } strcat(m_pszStatement, ")"); } catch(...) { if (NULL != m_fData) { fclose(m_fData); m_fData = NULL; } if (NULL != m_pszDataName) { unlink(m_pszDataName); free(m_pszDataName); m_pszDataName = NULL; } throw; } } void MySqlCursor::PrepareSelect(char *pszStatement) { RipCursor::PrepareSelect(pszStatement); if (NULL == m_mysql) { if (NULL != m_pDatabase->m_pActiveCursor) { // Connect(); } else { m_pDatabase->m_pActiveCursor = this; } } if (mysql_query(GetMySql(), pszStatement)) { m_pDatabase->HandleError("Unable to execute select query"); } } void MySqlCursor::PrepareAction(char *pszStatement) { RipCursor::PrepareAction(pszStatement); if (NULL == m_mysql) { if (NULL != m_pDatabase->m_pActiveCursor) { Connect(); } else { m_pDatabase->m_pActiveCursor = this; } } if (mysql_query(GetMySql(), pszStatement)) { m_pDatabase->HandleError("Unable to execute action statement"); } } void MySqlCursor::DefineOutputColumn(unsigned nCol, char *pszVar, unsigned maxlen, bool *pbIsNull) { if (NULL == m_resultset) { GetResults(); } if (m_nFieldCount <= nCol) { ThrowException(false, "Invalid nColumn number"); } Output *pNewOutput = new Output[m_nOutputCount + 1]; if (NULL != m_pOutput) { for (unsigned n = 0; n < m_nOutputCount; n++) { pNewOutput[n] = m_pOutput[n]; } delete [] m_pOutput; } m_pOutput = pNewOutput; m_pOutput[m_nOutputCount].nCol = nCol; m_pOutput[m_nOutputCount].pVar = pszVar; m_pOutput[m_nOutputCount].maxlen = maxlen; m_pOutput[m_nOutputCount].pbIsNull = pbIsNull; m_nOutputCount++; } void MySqlCursor::Execute() { if (InsertStatement == m_stType) { for (unsigned nField = 0; nField < m_nFieldCount; nField++) { if (0 != nField) { fputc('\t', m_fData); m_lTempSize++; } const char *pszValue = ('\0' == m_ppszValues[nField][0] ? "\\N" : m_ppszValues[nField]); fprintf(m_fData, "%s", pszValue); m_lTempSize += strlen(pszValue); } fputc('\n', m_fData); m_lTempSize++; if (MAX_TEMP_FILE_SIZE <= m_lTempSize) { /* Close file, load and reopen it, truncating it in the process */ fclose(m_fData); LoadData(); m_fData = fopen(m_pszDataName, "wb"); if (NULL == m_fData) { ThrowException(true, "Unable to open temp file %s: ", m_pszDataName); } m_lTempSize = 0; } } } bool MySqlCursor::Fetch() { if (NULL == m_resultset) { GetResults(); for (Output *pOutput = m_pOutput; pOutput < m_pOutput + m_nOutputCount; pOutput++) { if (m_nFieldCount <= pOutput->nCol) { ThrowException(false, "Invalid nColumn number"); } } } MYSQL_ROW row = mysql_fetch_row(m_resultset); if (NULL != row) { unsigned long *lengths = mysql_fetch_lengths(m_resultset); for (Output *pOutput = m_pOutput; pOutput < m_pOutput + m_nOutputCount; pOutput++) { ProcessOutput(pOutput, row, lengths); } } return NULL != row; } void MySqlCursor::GetResults() { m_resultset = mysql_store_result(GetMySql()); if (NULL == m_resultset) { m_pDatabase->HandleError("Unable to fetch results"); } m_nFieldCount = mysql_num_fields(m_resultset); } void MySqlCursor::ProcessOutput(Output *pOutput, MYSQL_ROW row, unsigned long *lengths) { if (NULL != row[pOutput->nCol] && NULL != pOutput->pVar) { if (pOutput->maxlen < lengths[pOutput->nCol]) { ThrowException(false, "Output buffer not large enough"); } strcpy((char *) (pOutput->pVar), row[pOutput->nCol]); } if (NULL != pOutput->pbIsNull) { *(pOutput->pbIsNull) = (NULL == row[pOutput->nCol]); } } void MySqlCursor::Connect() { m_mysql = mysql_init(NULL); if (NULL == m_mysql) { HandleError("Unable to initialize connection"); } if (NULL == mysql_real_connect(m_mysql, m_pDatabase->m_pszHost, m_pDatabase->m_pszUser, m_pDatabase->m_pszPasswd, m_pDatabase->m_pszDb, m_pDatabase->m_nPort, m_pDatabase->m_pszUnixSocket, m_pDatabase->m_nClientFlag)) { HandleError("Unable to connect to database"); } } MYSQL * MySqlCursor::GetMySql() { return (NULL == m_mysql ? m_pDatabase->m_mysql : m_mysql); } void MySqlCursor::LoadData() { /* Execute the LOAD DATA statement, reading data from the temp file */ if (mysql_query(GetMySql(), m_pszStatement)) { HandleError("Unable to LOAD DATA"); } } void MySqlCursor::HandleError(char *msg) { static char fullmsg[MAX_ERROR_LEN + 1]; if (NULL == m_mysql) { m_pDatabase->HandleError(msg); } else { strncat(strncat(strncpy(fullmsg, msg, MAX_ERROR_LEN + 1), ": ", MAX_ERROR_LEN), mysql_error(m_mysql), MAX_ERROR_LEN); ThrowException(false, fullmsg); } } MySqlDb::MySqlDb(RipObject *pFile /* = NULL */, RipDb *pMaster /* = NULL */) : RipDb(pFile, pMaster), m_mysql(NULL), m_pszHost(NULL), m_pszUser(NULL), m_pszPasswd(NULL), m_pszDb(NULL), m_pszUnixSocket(NULL), m_pActiveCursor(NULL) { } MySqlDb::~MySqlDb() { Disconnect(); if (NULL != m_pszUnixSocket) { delete [] m_pszUnixSocket; m_pszUnixSocket = NULL; } if (NULL != m_pszDb) { delete [] m_pszDb; m_pszDb = NULL; } if (NULL != m_pszPasswd) { delete [] m_pszPasswd; m_pszPasswd = NULL; } if (NULL != m_pszUser) { delete [] m_pszUser; m_pszUser = NULL; } if (NULL != m_pszHost) { delete [] m_pszHost; m_pszHost = NULL; } } IOHandler * MySqlDb::Create(RipObject *pFile) { return new MySqlDb(pFile, this); } void MySqlDb::Connect(char *pszDriverData) { char *pszHost = "localhost"; char *pszDatabase = "foonrip"; char *pszUser = "foonrip"; char *pszPassword = ""; int nPort = 0; char *pszUnixSocket = NULL; int nClientFlag = 0; char *pOption = pszDriverData; while ('\0' != *pOption) { char *pEnd = pOption; while ('\0' != *pEnd && '=' != *pEnd) { pEnd++; } if ('=' != *pEnd) { ThrowException(false, "Unknown MySql option %s", pOption); } *pEnd = '\0'; char *pValue = ++pEnd; while ('\0' != *pEnd && ',' != *pEnd && ';' != *pEnd) { pEnd++; } bool bLast = ('\0' == *pEnd); if (! bLast) { *pEnd = '\0'; } if (0 == strcmp(pOption, "host") || 0 == strcmp(pOption, "h")) { pszHost = pValue; } else if (0 == strcmp(pOption, "database") || 0 == strcmp(pOption, "db") || 0 == strcmp(pOption, "d")) { pszDatabase = pValue; } else if (0 == strcmp(pOption, "user") || 0 == strcmp(pOption, "u")) { pszUser = pValue; } else if (0 == strcmp(pOption, "password") || 0 == strcmp(pOption, "passwd") || 0 == strcmp(pOption, "passw") || 0 == strcmp(pOption, "pass") || 0 == strcmp(pOption, "p")) { pszPassword = pValue; } else { ThrowException(false, "Unknown MySql option %s", pOption); } if (bLast) { pOption = pEnd; } else { pOption = pEnd + 1; } } m_mysql = mysql_init(NULL); if (NULL == m_mysql) { HandleError("Unable to initialize connection"); } if (NULL == mysql_real_connect(m_mysql, pszHost, pszUser, pszPassword, pszDatabase, nPort, pszUnixSocket, nClientFlag)) { HandleError("Unable to connect to database"); } if (NULL != pszHost) { m_pszHost = new char[strlen(pszHost) + 1]; strcpy(m_pszHost, pszHost); } if (NULL != pszUser) { m_pszUser = new char[strlen(pszUser) + 1]; strcpy(m_pszUser, pszUser); } if (NULL != pszPassword) { m_pszPasswd = new char[strlen(pszPassword) + 1]; strcpy(m_pszPasswd, pszPassword); } if (NULL != pszDatabase) { m_pszDb = new char[strlen(pszDatabase) + 1]; strcpy(m_pszDb, pszDatabase); } m_nPort = nPort; if (NULL != pszUnixSocket) { m_pszUnixSocket = new char[strlen(pszUnixSocket) + 1]; strcpy(m_pszUnixSocket, pszUnixSocket); } m_nClientFlag = nClientFlag; } RipCursor * MySqlDb::CreateCursor() { return new MySqlCursor(this); } void MySqlDb::Disconnect() { if (NULL != m_mysql) { mysql_close(m_mysql); m_mysql = NULL; } } void MySqlDb::HandleError(char *msg) { static char fullmsg[MAX_ERROR_LEN + 1]; strncat(strncat(strncpy(fullmsg, msg, MAX_ERROR_LEN + 1), ": ", MAX_ERROR_LEN), mysql_error(m_mysql), MAX_ERROR_LEN); ThrowException(false, fullmsg); } #endif // defined(_WIN32) || defined(WITH_MYSQL)