OpenWrt Forum Archive

Topic: lost/dirty data when use ubus in multi-thread without thread lock

The content of this topic has been archived on 22 Apr 2018. There are no obvious gaps in this topic, but there may still be some posts missing at the end.

Hi,
We use dbus in our project, and want to replace it with ubus right now, but I have some concerns about how to use ubus in multi-thread mode.

I find that ubus use a global blob_buf to send message, if I invoke ubus APIs in multi-thread mode, the data may be dirty or lost, especially invoke synchronous.

the following is my example ubus_test_multithread.c :

/*
 * gcc ubus_test_multithread.c -o ubus_test_multithread -O0 -g -lubus -lubox -lpthread -I. -I./ubus -L./libubox/build -L./ubus/build -Wl,-rpath,./libubox/build -Wl,-rpath,./ubus/build
 */

#include <unistd.h>
#include <pthread.h>
#include "libubus.h"

enum {
    HELLO_ID,
    HELLO_MSG,
    __HELLO_MAX
};

static const struct blobmsg_policy hello_multithread_policy[] = {
    [HELLO_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
    [HELLO_MSG] = { .name = "msg", .type = BLOBMSG_TYPE_STRING },
};

static int test_hello(struct ubus_context *ctx, struct ubus_object *obj,
              struct ubus_request_data *req, const char *method,
              struct blob_attr *msg)
{
    struct blob_attr *tb[__HELLO_MAX];
    int id = 0;
    const char *msgstr = "(unknown)";

    blobmsg_parse(hello_multithread_policy, ARRAY_SIZE(hello_multithread_policy), tb, blob_data(msg), blob_len(msg));

    if (tb[HELLO_ID])
        id = (int)blobmsg_get_u32(tb[HELLO_ID]);

    if (tb[HELLO_MSG])
        msgstr = blobmsg_data(tb[HELLO_MSG]);

    printf("[msg]0x%x:%s\n", id, msgstr);

    return 0;
}

static const struct ubus_method test_multithread_methods[] = {
    UBUS_METHOD("hello", test_hello, hello_multithread_policy),
};

static struct ubus_object_type test_multithread_object_type =
    UBUS_OBJECT_TYPE("test.multithread", test_multithread_methods);

static struct ubus_object test_multithread_object = {
    .name = "test.multithread",
    .type = &test_multithread_object_type,
    .methods = test_multithread_methods,
    .n_methods = ARRAY_SIZE(test_multithread_methods),
};

static void server_main(struct ubus_context *ctx)
{
    int ret;

    printf("%s:%d\n", __FUNCTION__, __LINE__);
    ret = ubus_add_object(ctx, &test_multithread_object);
    if (ret)
        fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret));

    uloop_run();
}

static uint32_t g_objID;
static void *client_thread_proc(void *data) {
    char buffer[10];
    struct ubus_request req;
    struct blob_buf privateBlobBuf;
    struct ubus_context *ctx = (struct ubus_context *)data;
    uint32_t tID = pthread_self();
    uint32_t objID;
    int index;
    int result;

    printf("%s:%d tID=0x%x\n", __FUNCTION__, __LINE__, tID);
    memset(&privateBlobBuf, 0, sizeof(privateBlobBuf));
    if (g_objID) {
        objID = g_objID;
    } else {
        result = ubus_lookup_id(ctx, "test.multithread", &objID);
        if (result) {
            fprintf(stderr, "Failed to lookup object: %s\n", ubus_strerror(result));
            return NULL;
        }
    }
    
    for (index = 0; index < 9; index++) {
        snprintf(buffer, sizeof(buffer), "%09d", index);
        blob_buf_init(&privateBlobBuf, 0);
        blobmsg_add_u32(&privateBlobBuf, "id", tID);
        blobmsg_add_string(&privateBlobBuf, "msg", buffer);

        ubus_invoke_async(ctx, objID, "hello", privateBlobBuf.head, &req);
    }

    blob_buf_free(&privateBlobBuf);

    return NULL;
}

static void client_main(struct ubus_context *ctx) {
    pthread_t thread_clients[1];
    int index;

    ubus_lookup_id(ctx, "test.multithread", &g_objID);
    printf("%s:%d g_objID=0x%x\n", __FUNCTION__, __LINE__, g_objID);
    for (index = 0; index < sizeof(thread_clients) / sizeof(thread_clients[0]); index++) {
        pthread_create(&thread_clients[index], NULL, client_thread_proc, (void *)ctx);
    }
    
    uloop_run();
}

int main(int argc, char **argv)
{
    struct ubus_context *ctx;
    const char *ubus_socket = NULL;
    int ch;
    int clientMode = 0;

    while ((ch = getopt(argc, argv, "cs:")) != -1) {
        switch (ch) {
        case 's':
            ubus_socket = optarg;
            break;
        case 'c':
            clientMode = 1;
            break;
        default:
            break;
        }
    }

    argc -= optind;
    argv += optind;

    uloop_init();
    signal(SIGPIPE, SIG_IGN);

    ctx = ubus_connect(ubus_socket);
    if (!ctx) {
        fprintf(stderr, "Failed to connect to ubus\n");
        return -1;
    }

    ubus_add_uloop(ctx);

    if (clientMode) {
        client_main(ctx);
    } else {
        server_main(ctx);
    }

    ubus_free(ctx);
    uloop_done();

    return 0;
}

start ubusd in backgroud first, and then
*******/ubus# ./ubus_test_multithread &
[1] 17591
*******/ubus# server_main:59

*******/ubus# ./ubus_test_multithread -c
client_main:109 g_objID=0x5653e569
client_thread_proc:78 tID=0x20dc7700
client_thread_proc:78 tID=0x205c6700
[msg]0x20dc7700:000000000
[msg]0x205c6700:000000000
[msg]0x20dc7700:000000001
[msg]0x20dc7700:000000002
[msg]0x20dc7700:000000002
[msg]0x20dc7700:000000003
[msg]0x20dc7700:000000003
[msg]0x20dc7700:000000004
[msg]0x20dc7700:000000004
[msg]0x20dc7700:000000005
[msg]0x20dc7700:000000005
[msg]0x20dc7700:000000006
[msg]0x20dc7700:000000006
[msg]0x205c6700:000000006

I create two threads in client mode, they both invoke hello@server 9 times, but the output like above, the output don't make sense.

And if I call ubus_lookup_id in each thread, all threads will hang up, because they can't get any feed back in ubus_complete_request.

So, dose it mean that I have to use thread lock when use ubus in multi-thread mode ?

I have no idea what I can do with ubus, but just curious about your test program. So, I downloaded and gave it a try to natively compile it on my Seagate GoFLEX Home without a problem. However, when I executed the test program, it gives an error message as shown below:

[goflex@GoFLEX:/tmp 11%] ~ arm-openwrt-linux-uclibcgnueabi-gcc ubus_test_multithread.c -o ubus_test_multithread -O0 -g -lubus -lubox -lpthread
1.680u+0.200s=0:02.32e(81.0%) TDSavg=0k+0k+0k max=13896k 0+0io 3pf+0sw
[goflex@GoFLEX:/tmp 12%] ~ ps|grep ubusd
20554 root      1028 S    /sbin/ubusd
21511 goflex    1404 S    grep ubusd
[goflex@GoFLEX:/tmp 13%] ~ ./ubus_test_multithread 
Failed to connect to ubus
[goflex@GoFLEX:/tmp 14%] ~

(Last edited by mazilo on 21 Sep 2015, 04:27)

Hi mazilo,

the example should run with root user, (UBUS_UNIX_SOCKET need it, I can't find the refer doc about this at present, but I am sure I met the same error log when I run it with a non-root user).

thanks for your feedback, I forgot to figure this out in original post.

OK and it works. However, can you please help me on how to create n threads in client mode, so they will invoke hello@server x times?

[root@GoFLEX:/tmp 3%] # ./ubus_test_multithread 
server_main:58

(Last edited by mazilo on 21 Sep 2015, 14:27)

hi mazilo,

    sorry for response late.

the sample code create one thread in client mode, if you want to create more threads, please modify the thread_clients arry in client_main function(line 105), like this:
pthread_t thread_clients[2];   // change from 1 to 2


by the way, the server mode should run in background, and the client mode run in foreground
*******/ubus# ./ubus_test_multithread &
*******/ubus# ./ubus_test_multithread -c


thanks a lot !

eddid wrote:

hi mazilo,

    sorry for response late.

the sample code create one thread in client mode, if you want to create more threads, please modify the thread_clients arry in client_main function(line 105), like this:
pthread_t thread_clients[2];   // change from 1 to 2


[bquote]by the way, the server mode should run in background, and the client mode run in foreground
*******/ubus# ./ubus_test_multithread &
*******/ubus# ./ubus_test_multithread -c


thanks a lot !

OK and this is what I get:

[root@GoFLEX:/tmp 7%] # ./ubus_test_multithread & ./ubus_test_multithread -c
[1] 22820
server_main:58
client_main:114 g_objID=0xbdd26263
client_thread_proc:78 tID=0xb6c21530
[msg]0xb6c21530:000000000
[msg]0xb6c21530:000000001
[msg]0xb6c21530:000000002
[msg]0xb6c21530:000000003
[msg]0xb6c21530:000000004
[msg]0xb6c21530:000000005
[msg]0xb6c21530:000000006
[msg]0xb6c21530:000000007
client_thread_proc:78 tID=0xb6e21530
[msg]0xb6c21530:000000008
[msg]0xb6e21530:000000000
[msg]0xb6e21530:000000001
[msg]0xb6e21530:000000002
[msg]0xb6e21530:000000003
[msg]0xb6e21530:000000004
[msg]0xb6e21530:000000005
[msg]0xb6e21530:000000006
[msg]0xb6e21530:000000007
[msg]0xb6e21530:000000008

TBH, I have no idea but just wanted to see if I can natively compile (on my Seagate GoFLEX home unit) to reproduce the same thing.

(Last edited by mazilo on 22 Sep 2015, 14:45)

Hi mazilo,
thanks for your test.
the test result of you seems correct but I still have concerns, since one thread run and invoke ubus, and then another thread run.
I test on an Ubuntu server with 36-core CPU, and all threads can invoke dbus almost at the same time.
by the way, could you do more test with a modification(line 90):
    for (index = 0; index < 30; index++) { //change from 9 to 30 or more
I want to see the log multi-thread invoke dbus in conflict situation.

thank you, mazilo !

(Last edited by eddid on 23 Sep 2015, 09:45)

Eddid: the ubus library is not thread-safe and not written with thread-safety in mind. Either use a global lock or one instance per thread.

jow wrote:

Eddid: the ubus library is not thread-safe and not written with thread-safety in mind. Either use a global lock or one instance per thread.

thank you, jow !

I can understand the solution of adding a global lock for invoke ubus APIs, but it will reduce efficiency, the application may invoke ubus method very frequently.

Could you give more information about one instance per thread ? In my mind, there would be one ubus loop per process even I create one instance per thread, they use the same loop, use the global blob_buf in ubus, the conflict would be still exist.

thank you again, jow !

Hi All,

The discussions I went through regarding libubox and thread-safety are pretty old and I am curious to know if there is any thread-safe version of libubox already available in the Internet or if there any work has been done to address this feature.

I would be a great help if someone can provide me some input on this.

Thanks in advance!!!

The discussion might have continued from here.