Codec2: second round of refactoring and implemented proper access contention in codec_startEncode()

This commit is contained in:
Silvano Seva 2023-07-17 16:39:23 +02:00
parent 5b3e136127
commit e610979ac7
1 changed files with 113 additions and 107 deletions

View File

@ -37,8 +37,9 @@ static bool running;
static bool reqStop;
static pthread_t codecThread;
static pthread_mutex_t mutex;
static pthread_cond_t wakeup_cond;
static pthread_mutex_t data_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t wakeup_cond = PTHREAD_COND_INITIALIZER;
static uint8_t readPos;
static uint8_t writePos;
@ -55,87 +56,46 @@ static const uint8_t micGainPost = 4;
static void *encodeFunc(void *arg);
static void *decodeFunc(void *arg);
static void startThread(void *(*func) (void *));
static bool startThread(const pathId path, void *(*func) (void *));
static void stopThread();
void codec_init()
{
pthread_mutex_lock(&init_mutex);
initCnt += 1;
pthread_mutex_unlock(&init_mutex);
if(initCnt > 0)
{
pthread_mutex_lock(&mutex);
initCnt += 1;
pthread_mutex_unlock(&mutex);
return;
}
else
{
initCnt = 1;
}
running = false;
readPos = 0;
writePos = 0;
numElements = 0;
memset(dataBuffer, 0x00, BUF_SIZE * sizeof(uint64_t));
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&wakeup_cond, NULL);
}
void codec_terminate()
{
pthread_mutex_lock(&mutex);
pthread_mutex_lock(&init_mutex);
initCnt -= 1;
pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&init_mutex);
if(initCnt > 0) return;
if(running) stopThread();
if(initCnt > 0)
return;
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&wakeup_cond);
if(running)
stopThread();
}
bool codec_startEncode(const pathId path)
{
// Bad incoming path
if(audioPath_getStatus(path) != PATH_OPEN)
return false;
if(running)
{
if(audioPath_getStatus(audioPath) == PATH_OPEN)
return false;
else
stopThread();
}
running = true;
audioPath = path;
startThread(encodeFunc);
return true;
return startThread(path, encodeFunc);
}
bool codec_startDecode(const pathId path)
{
// Bad incoming path
if(audioPath_getStatus(path) != PATH_OPEN)
return false;
if(running)
{
if(audioPath_getStatus(audioPath) == PATH_OPEN)
return false;
else
stopThread();
}
running = true;
audioPath = path;
startThread(decodeFunc);
return true;
return startThread(path, decodeFunc);
}
void codec_stop(const pathId path)
@ -161,16 +121,16 @@ int codec_popFrame(uint8_t *frame, const bool blocking)
return -EAGAIN;
// Blocking call: wait until some data is pushed
pthread_mutex_lock(&mutex);
pthread_mutex_lock(&data_mutex);
while(numElements == 0)
{
pthread_cond_wait(&wakeup_cond, &mutex);
pthread_cond_wait(&wakeup_cond, &data_mutex);
}
element = dataBuffer[readPos];
readPos = (readPos + 1) % BUF_SIZE;
numElements -= 1;
pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&data_mutex);
// Do memcpy after mutex unlock to reduce execution time spent inside the
// critical section
@ -195,21 +155,18 @@ int codec_pushFrame(const uint8_t *frame, const bool blocking)
return -EAGAIN;
// Blocking call: wait until there is some free space
pthread_mutex_lock(&mutex);
pthread_mutex_lock(&data_mutex);
while(numElements >= BUF_SIZE)
{
pthread_cond_wait(&wakeup_cond, &mutex);
pthread_cond_wait(&wakeup_cond, &data_mutex);
}
// There is free space, push data into the queue
dataBuffer[writePos] = element;
writePos = (writePos + 1) % BUF_SIZE;
// Signal that the queue is not empty
if(numElements == 0) pthread_cond_signal(&not_empty);
numElements += 1;
pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&data_mutex);
return 0;
}
@ -229,6 +186,7 @@ static void *encodeFunc(void *arg)
STREAM_INPUT | BUF_CIRC_DOUBLE);
if(iStream < 0)
{
pthread_detach(pthread_self());
running = false;
return NULL;
}
@ -243,51 +201,56 @@ static void *encodeFunc(void *arg)
break;
dataBlock_t audio = inputStream_getData(iStream);
if(audio.data == NULL)
break;
if(audio.data != NULL)
#ifndef PLATFORM_LINUX
// Pre-amplification stage
for(size_t i = 0; i < audio.len; i++) audio.data[i] *= micGainPre;
// DC removal
dsp_dcRemoval(&dcrState, audio.data, audio.len);
// Post-amplification stage
for(size_t i = 0; i < audio.len; i++) audio.data[i] *= micGainPost;
#endif
// CODEC2 encodes 160ms of speech into 8 bytes: here we write the
// new encoded data into a buffer of 16 bytes writing the first
// half and then the second one, sequentially.
// Data ready flag is rised once all the 16 bytes contain new data.
uint64_t frame = 0;
codec2_encode(codec2, ((uint8_t*) &frame), audio.data);
pthread_mutex_lock(&data_mutex);
// If buffer is full erase the oldest frame
if(numElements >= BUF_SIZE)
{
#ifndef PLATFORM_LINUX
// Pre-amplification stage
for(size_t i = 0; i < audio.len; i++) audio.data[i] *= micGainPre;
// DC removal
dsp_dcRemoval(&dcrState, audio.data, audio.len);
// Post-amplification stage
for(size_t i = 0; i < audio.len; i++) audio.data[i] *= micGainPost;
#endif
// CODEC2 encodes 160ms of speech into 8 bytes: here we write the
// new encoded data into a buffer of 16 bytes writing the first
// half and then the second one, sequentially.
// Data ready flag is rised once all the 16 bytes contain new data.
uint64_t frame = 0;
codec2_encode(codec2, ((uint8_t*) &frame), audio.data);
pthread_mutex_lock(&mutex);
// If buffer is full erase the oldest frame
if(numElements >= BUF_SIZE)
{
readPos = (readPos + 1) % BUF_SIZE;
}
dataBuffer[writePos] = frame;
writePos = (writePos + 1) % BUF_SIZE;
if(numElements == 0)
pthread_cond_signal(&wakeup_cond);
if(numElements < BUF_SIZE)
numElements += 1;
pthread_mutex_unlock(&mutex);
readPos = (readPos + 1) % BUF_SIZE;
}
dataBuffer[writePos] = frame;
writePos = (writePos + 1) % BUF_SIZE;
if(numElements == 0)
pthread_cond_signal(&wakeup_cond);
if(numElements < BUF_SIZE)
numElements += 1;
pthread_mutex_unlock(&data_mutex);
}
audioStream_terminate(iStream);
codec2_destroy(codec2);
// In case thread terminates due to invalid path or stream error, detach it
// to ensure that its memory gets freed by the OS.
if(reqStop == false)
pthread_detach(pthread_self());
running = false;
return NULL;
}
@ -304,6 +267,7 @@ static void *decodeFunc(void *arg)
STREAM_OUTPUT | BUF_CIRC_DOUBLE);
if(oStream < 0)
{
pthread_detach(pthread_self());
running = false;
return NULL;
}
@ -326,7 +290,7 @@ static void *decodeFunc(void *arg)
uint64_t frame = 0;
bool newData = false;
pthread_mutex_lock(&mutex);
pthread_mutex_lock(&data_mutex);
if(numElements != 0)
{
@ -339,9 +303,11 @@ static void *decodeFunc(void *arg)
newData = true;
}
pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&data_mutex);
stream_sample_t *audioBuf = outputStream_getIdleBuffer(oStream);
if(audioBuf == NULL)
break;
if(newData)
{
@ -365,16 +331,52 @@ static void *decodeFunc(void *arg)
audioStream_stop(oStream);
codec2_destroy(codec2);
// In case thread terminates due to invalid path or stream error, detach it
// to ensure that its memory gets freed by the OS.
if(reqStop == false)
pthread_detach(pthread_self());
running = false;
return NULL;
}
static void startThread(void *(*func) (void *))
static bool startThread(const pathId path, void *(*func) (void *))
{
// Bad incoming path
if(audioPath_getStatus(path) != PATH_OPEN)
return false;
// Handle access contention when starting the codec thread to ensure that
// only one call at a time can effectively start the thread.
pthread_mutex_lock(&init_mutex);
if(running)
{
// New path takes over the current one only if it has an higher priority
// or the current one is closed/suspended.
pathInfo_t newPath = audioPath_getInfo(path);
pathInfo_t curPath = audioPath_getInfo(audioPath);
if((curPath.status == PATH_OPEN) && (curPath.prio >= newPath.prio))
{
pthread_mutex_unlock(&init_mutex);
return false;
}
else
{
stopThread();
}
}
running = true;
audioPath = path;
pthread_mutex_unlock(&init_mutex);
readPos = 0;
writePos = 0;
numElements = 0;
reqStop = false;
int ret = 0;
#ifdef _MIOSIX
// Set stack size of CODEC2 thread to 16kB.
pthread_attr_t codecAttr;
@ -387,11 +389,15 @@ static void startThread(void *(*func) (void *))
pthread_attr_setschedparam(&codecAttr, &param);
// Start thread
pthread_create(&codecThread, &codecAttr, func, ((void *) audioPath));
ret = pthread_create(&codecThread, &codecAttr, func, ((void *) audioPath));
#else
pthread_create(&codecThread, NULL, func, ((void *) audioPath));
ret = pthread_create(&codecThread, NULL, func, ((void *) audioPath));
#endif
if(ret < 0)
running = false;
return running;
}
static void stopThread()