文章归档

libprocess并发编程

libprocess是mesos中非常重要的一个基础库,提供一些很方便的helper函数以及并发编程所需要的基本原语,例如下面我将重点讲的future/promise。

为了更好的解释future/promise是什么,我抽取了一段mesos中的代码作为例子:

Future<Socket> PollSocketImpl::accept()
{
  return io::poll(get(), io::READ)
    .then(lambda::bind(&internal::accept, get()));
}

这个函数的基本作用是:使用io::poll()注册io::READ事件,并且当事件ready的时候,调用internal::accept()。

显然,这是一个异步的accept()方法。

为什么说是异步的呢?因为调用了accept()方法之后,你会立即得到一个Future<Socket>值,它不是一个真正的Socket变量,而是Future<Socket>,顾名思义,这是一个未来的值,现在没有任何意义。

那么这个值什么时候会ready呢?不知道,但你可以有三种方式,在将来某个时刻取得这个值:

  1. 调用Future<Socket>::get(),如果值还没有就绪,get()函数会一直阻塞,直到能拿到一个Socket值,然后返回
  2. 通过Future<Socket>::isReady()等API定期判断,一旦isReady()返回true,即可调用Future<Socket>::get(),get()函数立即返回
  3. 注册回调函数,告诉生产者,当值就绪之后,触发我注册的回调。Future提供多种回调方式,后面会讲到

也就是说,借助Future/Promise,生产者和消费者之间可以做到完全异步。

future和promise的定义

以下是我摘自cplusplus.com中future/promise的定义,libprocess的思想与此类似:

  1. A future is an object that can retrieve a value from some
    provider object or function, properly synchronizing this access if in
    different threads.
  2. A promise is an object that can store a value of typeT to be retrieved by a future object (possibly in another thread), offering a synchronization point.

你可以理解为这就是一个管道,future是读端,promise是写端。

future基本结构

future变量,主要有三个组成部分,状态,数据域,callbacks。由struct Data{} 结构维护,struct Data{}结构在future变量里是一个私有的成员变量,并且是shared_ptr<T>。

为什么是需要shared_ptr<T>呢?因为future本身是可以随意拷贝的,它既可以是函数的参数,也可以是函数的返回值,生产者和消费者同时持有同一个future变量,但数据是只有一份的,多线程之间访问需要互斥,这个统统由future的实现来完成,对开发者完全透明。

  struct Data
  {
    Data();
    ~Data();

    void clearAllCallbacks();

    int lock;
    State state;
    bool discard;
    bool associated;
    T* t;
    std::string* message; // Message associated with failure.
    std::vector<DiscardCallback> onDiscardCallbacks;
    std::vector<ReadyCallback> onReadyCallbacks;
    std::vector<FailedCallback> onFailedCallbacks;
    std::vector<DiscardedCallback> onDiscardedCallbacks;
    std::vector<AnyCallback> onAnyCallbacks;
  };

1. 状态

  enum State
  {
    PENDING,
    READY,
    FAILED,
    DISCARDED,
  };

future变量有4种状态,分别是:

  1. PENDING表示值还没有ready,这个时候生产者还没调用promise<T>.set(),而消费者调用future<T>.get()的话会处于阻塞状态
  2. READY表示值已经就绪,future<T>.get()会立即返回结果
  3. FAILED表示值未就绪,已被破坏。通常生产者在处理过程出现异常时会设置这个状态
  4. DISCARDED表示值被丢失。生产者缺少某些条件无法计算这个值

2. 数据

struct Data{}中的T* t用来保存数据,生产者通过promise<T>.set()将值保存到这里,然后触发相应的回调函数,消费者通过future<T>.get()从这里读取,如下:

set过程

template <typename T>
bool Promise<T>::set(const T& t)
{
  if (!f.data->associated) {
    return f.set(t);
  }
  return false;
}

template <typename T>
bool Future<T>::set(const T& _t)
{
  bool result = false;

  internal::acquire(&data->lock);
  {
    if (data->state == PENDING) {
      data->t = new T(_t);
      data->state = READY;
      result = true;
    }
  }
  internal::release(&data->lock);

  // Invoke all callbacks associated with this future being READY. We
  // don't need a lock because the state is now in READY so there
  // should not be any concurrent modications.
  if (result) {
    internal::run(data->onReadyCallbacks, *data->t);
    internal::run(data->onAnyCallbacks, *this);

    data->clearAllCallbacks();
  }

  return result;
}

get过程如下:

template <typename T>
const T& Future<T>::get() const
{
  if (!isReady()) {
    await();
  }

  CHECK(!isPending()) << "Future was in PENDING after await()";
  // We can't use CHECK_READY here due to check.hpp depending on future.hpp.
  if (!isReady()) {
    CHECK(!isFailed()) << "Future::get() but state == FAILED: " << failure();
    CHECK(!isDiscarded()) << "Future::get() but state == DISCARDED";
  }

  assert(data->t != NULL);
  return *data->t;
}

3. callbacks

future支持非常多类型的callbacks,针对每种不同的状态都可以设置相应的callback,如struct Data{}里可以看到如下字段:

    std::vector<DiscardCallback> onDiscardCallbacks;
    std::vector<ReadyCallback> onReadyCallbacks;
    std::vector<FailedCallback> onFailedCallbacks;
    std::vector<DiscardedCallback> onDiscardedCallbacks;
    std::vector<AnyCallback> onAnyCallbacks;

其中:

  1. 当生产者调用promise<T>.discard()时触发onDiscardCallbacks
  2. 当生产者调用promise<T>.set()时触发onReadyCallbacks
  3. 当生产者调用promise<T>.fail()时触发onFailedCallbacks
  4. 不管生产者触发何种动作,onAnyCallbacks都会触发

libprocess中的future<T>::then(f)

libprocess里的future实现支持then()方法,允许你通过future的callbacks来实现一些相当高级别的黑科技。例如,你的代码可以这样写:

Future<int> second(const bool& b)
{
  return b ? 1 : 0;
}

Future<string> third(const int& s)
{
  return s > 0 ? "good" : "bad";
}

TEST(Process, chain)
{
  Future<string> S = readyFuture()
    .then(lambda::bind(&second, lambda::_1))
    .then(lambda::bind(&third, lambda::_1));

  string s = S.get();
}

这里的意思是说,readyFuture()得到一个future变量A,当A就绪时,执行second(A)并得到一个future变量B,当B就绪时,执行third(B)得到一个future值S。上面的代码等同于:

TEST(Process, chain)
{
  Future<bool> A = oneFuture();
  Future<int> B = second(A.get());
  Future<string> S = third(B.get());

  string s = S.get();
}

注意,get()是同步的,会一直等到相应的future变量处于ready状态才返回。

可以看到,在第一种实现里,Future<string>是third()函数的返回值,而不是readyFuture()函数的返回值,注意返回值的类型。这段代码里,我们最终想要的,只是S的值,A/B对我们而言只是一个临时变量。既然如何,有什么办法能省掉它们呢?libprocess又是怎么做到的呢?

then的实现如下:

template <typename T>
template <typename X>
Future<X> Future<T>::then(const lambda::function<Future<X>(const T&)>& f) const
{
  memory::shared_ptr<Promise<X>> promise(new Promise<X>());

  lambda::function<void(const Future<T>&)> thenf =
    lambda::bind(&internal::thenf<T, X>, f, promise, lambda::_1);

  onAny(thenf);

  // Propagate discarding up the chain. To avoid cyclic dependencies,
  // we keep a weak future in the callback.
  promise->future().onDiscard(
      lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));

  return promise->future();
}

在这个函数里我们可以看到,实质上then只是创建了一个闭包thenf,这个闭包的上下文是:

  1. 当前的Future<T>
  2. f
  3. f 的返回值Future<X>对应的Promise<X>,闭包里只需要保存Promise<X>即可,Future<X>是作为函数返回值返回的,仅供caller读取
  4. internal::thenf<T, X> 函数

然后将这个闭包作为一个callbacks,注册到Future<T>里,当这个future有任何action时(ready/failed/discard),触发这个callback。这个callback所做的事情就是,调用f(T)得到Future<X>。但是问题又来了,因为thenf()必然是在Future<T>::then(f)之后某个时刻才执行的,这两个函数完全异步。但是Future<T>::then(f)函数返回时,caller拿到一个Future<X>,并希望从此得到期望的结果。但是thenf()执行时,说明T已经ready,所以调用f(T),也就是Future<T>::then(f)时注册的callback函数,f(T)也会产生一个Future<X>,但是此Future<X>非彼Future<X>,因为f(T)函数也可能是异步执行的。

显然,Future<T>::then(f)返回给caller的Future<X>是一个临时future,真正的Future<X>只有在f(T)函数被调用时才会得到,但是Future<T>::then()的caller显然并不之情,它只关心它所拿到的Future<X>,并期望从中得到结果。

这就需要一种手段,当Future<T>::then(f)注册的f函数被执行时,f的返回值能够直接透传给Future<T>::then(f)的caller,promise->associate()就是做这个事情的。

template <typename T, typename X>
void thenf(const lambda::function<Future<X>(const T&)>& f,
           const memory::shared_ptr<Promise<X>>& promise,
           const Future<T>& future)
{
  if (future.isReady()) {
    if (future.hasDiscard()) {
      promise->discard();
    } else {
      promise->associate(f(future.get()));
    }
  } else if (future.isFailed()) {
    promise->fail(future.failure());
  } else if (future.isDiscarded()) {
    promise->discard();
  }
}

promise是thenf闭包里的一个上下文,Future<T>::then(f)被调用之后,then()函数会创建一个临时的promise,并将promise关联的Future<X>返回给caller,也就是说,caller拿到的只是一个临时的Future<X>。当Future<T> ready之后,then(f)注册的f函数被执行,f(T)才返回一个真正的Future<X>,Promise<X>::associate()的作用就是当真正的Future<X> ready时,将其数据copy到自身关联的临时的Future<X>中去。

2 comments to libprocess并发编程

Leave a Reply

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>