Gearman分布式计算

教研室的OCR云服务现在有了新需求:分布式任务处理,由于识别引擎的计算过程比较耗时,也比较耗CPU,因此需要在多个机器上跑多个识别引擎,组成一个分布式计算系统,来应对高并发的识别请求。
这样一个分布式计算系统的框架结构是比较明确的,即分三个部分:调度中心、处理器和对外接口,如下图所示,任务经接口提交到调度中心,然后经调度中心分配到不同的处理器,处理器完成任务处理,将结果返回调度中心,调度中心再把结果发送到接口,这样就构成了一个完整的流程(红色线所示)。
本来想用RPC框架(如ThriftgRPC)自己来写,想想还是有点复杂的,如处理器的注册、fail-over、任务状态、数据格式等等,然后搜了一番,发现了Gearman这个开源库,一看介绍,太符合了我这个需求了,而且Instagram也用它,质量有保证,哈哈,就用它了!
简单介绍一下Gearman,先看这个图:
Gearman可以实现对传统C-S结构系统的分布式扩展,只需要把你的客户端代码和Client API结合,服务端代码和Worker API结合,你的系统就变成分布式的了。以前是客户端直接连接服务端,现在服务端变成了很多个worker,Client向Job Server提交任务,Job Server将任务分配到可用的Worker,再将Worker的处理结果返回给Client。下图是Gearman的官方例子reverse(字符串反转)的示意图:
一图胜千言,1、2、3、4标明的是数据流向,相信已经很清楚了吧。个人认为Gearman有两点非常不错:
首先是多语言支持,即Client API和Worker API都支持多种语言,从而能方便地实现跨语言环境的数据交换,比如服务器端用C++、网页端用PHP、Android用Java, iOS用C,等等,实际上只需要遵循Gearman的协议就能实现对应语言的API。
第二点就是系统的fail-over机制很强大,如下图:
不仅仅是Worker挂了没事,Job Server挂了也没事,Job Server挂了后,Client和Worker会自动切换到剩下的可用Job Server,也就是说只要有至少一个Job Server和至少一个Worker正常工作,Client提交的任务就能正常被处理。Worker可以随时添加和移除,比如说某台机器上跑了几个Worker,突然这台机器断电了,没关系,重启以后这些Worker又能继续投入工作了,而这一切对Client来说都是透明的,即Worker的规模可以随时进行在线扩展。
介绍就到这,接下来写代码!我用的Ubuntu 14.04 64位系统,首先编译Gearman,需要安装libiconv、boost、libevent、gperf、openssl、uuid、libdrizzle等依赖,make install完成后就可以用gearmand命令把Job Server跑起来,然后可以试试example文件夹下的示例程序。
然后是写Worker,这里贴一下关键代码,首先是创建worker实例:
1
2
3
4
5
6
gearman_worker_st *worker;
if ((worker= gearman_worker_create(NULL)) == NULL)
{
    std::cerr << "Memory allocation failure on worker creation." << std::endl;
    return EXIT_FAILURE;
}
然后添加Job Server,这里面是“add”而不是“set”,因为之前提到过,Job Server是可以多个一起工作的,所以一个Worker可以添加多个Job Server。添加Job Server只需指定主机和端口。

1
2
3
4
5
if (gearman_failed(gearman_worker_add_server(worker, host.c_str(), port)))
{
    std::cerr << gearman_worker_error(worker) << std::endl;
    return EXIT_FAILURE;
}
接着设置Worker的function,JOB_NAME是function的标识,相当于对外的接口名称

1
2
3
4
5
6
7
8
9
10
gearman_function_t worker_fn= gearman_function_create(ocr_worker);
if (gearman_failed(gearman_worker_define_function(worker,
                                                  gearman_literal_param(JOB_NAME),
                                                  worker_fn,
                                                  0, 
                                                  NULL)))
{
    std::cerr << gearman_worker_error(worker) << std::endl;
    return EXIT_FAILURE;
}
最后在循环中执行function,即阻塞在gearman_worker_work,直到收到Job Server的任务请求

1
2
3
4
5
6
7
8
while (true)
{
    if (gearman_failed(gearman_worker_work(worker)))
    {
        std::cerr << gearman_worker_error(worker) << std::endl;
        break;
    }
}
然后看ocr_worker的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static gearman_return_t ocr_worker(gearman_job_st *job, void *context)
{
    const char *workload= (const char *)gearman_job_workload(job);
    const size_t workload_size= gearman_job_workload_size(job);
 
    std::string result_str;
 
    bmiocr::method_ret_t ret =  bmiocr::ocr_engine_run(workload, workload_size, result_str);
    if(!ret.is_ok)
    {
        result_str = print_error(ret.err_msg);
    }
 
    if (gearman_failed(gearman_job_send_data(job, result_str.c_str(), result_str.size())))
    {
        return GEARMAN_ERROR;
    }
 
    return GEARMAN_SUCCESS;
}
真正的任务处理其实就是bmiocr::ocr_engine_run这个调用。从ocr_worker的实现可以看到,提交给Worker的请求数据是包装在gearman_job_st结构中,然后通过gearman_job_workload方法取出数据指针,并通过gearman_job_workload_size得到数据长度,而Worker处理结果则是通过gearman_job_send_data发送给Job Server。
Worker写好后,编译运行,然后用php写个简单的Client,测试一下,工作正常!然后把Worker运行在其它机器上,也能正常工作,同时发现:如果一个Worker在任务处理过程中挂掉(比如强制结束程序),任务会转到其它Worker上去,Client感知不到;而如果是仅有的一个Worker在任务处理过程中挂掉了,任务仍然会在Job Server上保留,下一次有Worker连上的时候,任务会重新被发送到新的可用Worker上来。所以,任务会尽可能地被正确执行,并且对Client都是透明的。
Gearman还支持异步的任务进度查询,比如一个耗时任务,Worker在处理过程中可以同时向Job Server发送任务进度信息,Client也可以查询一个任务的进度信息。
综上,在Gearman框架的基础上,搭建一个分布式计算系统还是很轻松的!

评论