thread-pool: Convert thread_pool_aiocb_info.cancel to cancel_async
The .cancel_async shares the same the first half with .cancel: try to steal the request if not submitted yet. In this case set the elem to THREAD_DONE status and ret to -ECANCELED, which means thread_pool_completion_bh will call the cb with -ECANCELED. If the request is already submitted, do nothing, as we know the normal completion will happen in the future. Testing code update: Before, done_cb is only called if the request is already submitted by thread pool. Now done_cb is always called, even before it is submitted, because we emulate bdrv_aio_cancel with bdrv_aio_cancel_async. So also update the test criteria accordingly. Signed-off-by: Fam Zheng <famz@redhat.com> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
		
							parent
							
								
									f600ac1902
								
							
						
					
					
						commit
						3391f5e51c
					
				| @ -33,7 +33,7 @@ static int long_cb(void *opaque) | ||||
| static void done_cb(void *opaque, int ret) | ||||
| { | ||||
|     WorkerTestData *data = opaque; | ||||
|     g_assert_cmpint(data->ret, ==, -EINPROGRESS); | ||||
|     g_assert(data->ret == -EINPROGRESS || data->ret == -ECANCELED); | ||||
|     data->ret = ret; | ||||
|     data->aiocb = NULL; | ||||
| 
 | ||||
| @ -132,7 +132,7 @@ static void test_submit_many(void) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| static void test_cancel(void) | ||||
| static void do_test_cancel(bool sync) | ||||
| { | ||||
|     WorkerTestData data[100]; | ||||
|     int num_canceled; | ||||
| @ -170,18 +170,25 @@ static void test_cancel(void) | ||||
|     for (i = 0; i < 100; i++) { | ||||
|         if (atomic_cmpxchg(&data[i].n, 0, 3) == 0) { | ||||
|             data[i].ret = -ECANCELED; | ||||
|             bdrv_aio_cancel(data[i].aiocb); | ||||
|             active--; | ||||
|             if (sync) { | ||||
|                 bdrv_aio_cancel(data[i].aiocb); | ||||
|             } else { | ||||
|                 bdrv_aio_cancel_async(data[i].aiocb); | ||||
|             } | ||||
|             num_canceled++; | ||||
|         } | ||||
|     } | ||||
|     g_assert_cmpint(active, >, 0); | ||||
|     g_assert_cmpint(num_canceled, <, 100); | ||||
| 
 | ||||
|     /* Canceling the others will be a blocking operation.  */ | ||||
|     for (i = 0; i < 100; i++) { | ||||
|         if (data[i].aiocb && data[i].n != 3) { | ||||
|             bdrv_aio_cancel(data[i].aiocb); | ||||
|             if (sync) { | ||||
|                 /* Canceling the others will be a blocking operation.  */ | ||||
|                 bdrv_aio_cancel(data[i].aiocb); | ||||
|             } else { | ||||
|                 bdrv_aio_cancel_async(data[i].aiocb); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| @ -193,15 +200,25 @@ static void test_cancel(void) | ||||
|     for (i = 0; i < 100; i++) { | ||||
|         if (data[i].n == 3) { | ||||
|             g_assert_cmpint(data[i].ret, ==, -ECANCELED); | ||||
|             g_assert(data[i].aiocb != NULL); | ||||
|             g_assert(data[i].aiocb == NULL); | ||||
|         } else { | ||||
|             g_assert_cmpint(data[i].n, ==, 2); | ||||
|             g_assert_cmpint(data[i].ret, ==, 0); | ||||
|             g_assert(data[i].ret == 0 || data[i].ret == -ECANCELED); | ||||
|             g_assert(data[i].aiocb == NULL); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| static void test_cancel(void) | ||||
| { | ||||
|     do_test_cancel(true); | ||||
| } | ||||
| 
 | ||||
| static void test_cancel_async(void) | ||||
| { | ||||
|     do_test_cancel(false); | ||||
| } | ||||
| 
 | ||||
| int main(int argc, char **argv) | ||||
| { | ||||
|     int ret; | ||||
| @ -217,6 +234,7 @@ int main(int argc, char **argv) | ||||
|     g_test_add_func("/thread-pool/submit-co", test_submit_co); | ||||
|     g_test_add_func("/thread-pool/submit-many", test_submit_many); | ||||
|     g_test_add_func("/thread-pool/cancel", test_cancel); | ||||
|     g_test_add_func("/thread-pool/cancel-async", test_cancel_async); | ||||
| 
 | ||||
|     ret = g_test_run(); | ||||
| 
 | ||||
|  | ||||
| @ -31,7 +31,6 @@ enum ThreadState { | ||||
|     THREAD_QUEUED, | ||||
|     THREAD_ACTIVE, | ||||
|     THREAD_DONE, | ||||
|     THREAD_CANCELED, | ||||
| }; | ||||
| 
 | ||||
| struct ThreadPoolElement { | ||||
| @ -58,7 +57,6 @@ struct ThreadPool { | ||||
|     AioContext *ctx; | ||||
|     QEMUBH *completion_bh; | ||||
|     QemuMutex lock; | ||||
|     QemuCond check_cancel; | ||||
|     QemuCond worker_stopped; | ||||
|     QemuSemaphore sem; | ||||
|     int max_threads; | ||||
| @ -73,7 +71,6 @@ struct ThreadPool { | ||||
|     int idle_threads; | ||||
|     int new_threads;     /* backlog of threads we need to create */ | ||||
|     int pending_threads; /* threads created but not running yet */ | ||||
|     int pending_cancellations; /* whether we need a cond_broadcast */ | ||||
|     bool stopping; | ||||
| }; | ||||
| 
 | ||||
| @ -113,9 +110,6 @@ static void *worker_thread(void *opaque) | ||||
|         req->state = THREAD_DONE; | ||||
| 
 | ||||
|         qemu_mutex_lock(&pool->lock); | ||||
|         if (pool->pending_cancellations) { | ||||
|             qemu_cond_broadcast(&pool->check_cancel); | ||||
|         } | ||||
| 
 | ||||
|         qemu_bh_schedule(pool->completion_bh); | ||||
|     } | ||||
| @ -173,7 +167,7 @@ static void thread_pool_completion_bh(void *opaque) | ||||
| 
 | ||||
| restart: | ||||
|     QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { | ||||
|         if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { | ||||
|         if (elem->state != THREAD_DONE) { | ||||
|             continue; | ||||
|         } | ||||
|         if (elem->state == THREAD_DONE) { | ||||
| @ -217,22 +211,26 @@ static void thread_pool_cancel(BlockDriverAIOCB *acb) | ||||
|          */ | ||||
|         qemu_sem_timedwait(&pool->sem, 0) == 0) { | ||||
|         QTAILQ_REMOVE(&pool->request_list, elem, reqs); | ||||
|         elem->state = THREAD_CANCELED; | ||||
|         qemu_bh_schedule(pool->completion_bh); | ||||
|     } else { | ||||
|         pool->pending_cancellations++; | ||||
|         while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { | ||||
|             qemu_cond_wait(&pool->check_cancel, &pool->lock); | ||||
|         } | ||||
|         pool->pending_cancellations--; | ||||
| 
 | ||||
|         elem->state = THREAD_DONE; | ||||
|         elem->ret = -ECANCELED; | ||||
|     } | ||||
| 
 | ||||
|     qemu_mutex_unlock(&pool->lock); | ||||
|     thread_pool_completion_bh(pool); | ||||
| } | ||||
| 
 | ||||
| static AioContext *thread_pool_get_aio_context(BlockDriverAIOCB *acb) | ||||
| { | ||||
|     ThreadPoolElement *elem = (ThreadPoolElement *)acb; | ||||
|     ThreadPool *pool = elem->pool; | ||||
|     return pool->ctx; | ||||
| } | ||||
| 
 | ||||
| static const AIOCBInfo thread_pool_aiocb_info = { | ||||
|     .aiocb_size         = sizeof(ThreadPoolElement), | ||||
|     .cancel             = thread_pool_cancel, | ||||
|     .cancel_async       = thread_pool_cancel, | ||||
|     .get_aio_context    = thread_pool_get_aio_context, | ||||
| }; | ||||
| 
 | ||||
| BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, | ||||
| @ -299,7 +297,6 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) | ||||
|     pool->ctx = ctx; | ||||
|     pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); | ||||
|     qemu_mutex_init(&pool->lock); | ||||
|     qemu_cond_init(&pool->check_cancel); | ||||
|     qemu_cond_init(&pool->worker_stopped); | ||||
|     qemu_sem_init(&pool->sem, 0); | ||||
|     pool->max_threads = 64; | ||||
| @ -342,7 +339,6 @@ void thread_pool_free(ThreadPool *pool) | ||||
| 
 | ||||
|     qemu_bh_delete(pool->completion_bh); | ||||
|     qemu_sem_destroy(&pool->sem); | ||||
|     qemu_cond_destroy(&pool->check_cancel); | ||||
|     qemu_cond_destroy(&pool->worker_stopped); | ||||
|     qemu_mutex_destroy(&pool->lock); | ||||
|     g_free(pool); | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Fam Zheng
						Fam Zheng