スキップしてメイン コンテンツに移動

新BitTorrentプロトコルµTPを実装するlibutpソースコードの概観

5月21日に、libutpソースコードがMITライセンスで公開された。libutpとは、µTP (Micro Transport Protocol)と呼ばれるプロトコルの実装ライブラリである(BitTorrent.orgではBEP 29として提案され、IETFではLEDBATとして提案された)。今回は、そのコードを見てみたい。

libutpを公開したのはBitTorrent, Inc.で、彼らが配布するBitTorrentクライアントμTorrentのバージョン1.8 betaがµTPを最初に実装した。BitTorrent, Inc.が元々持っていたオリジナルのBitTorrent実装(Mainline)は、Pythonで書かれたソフトウェアとして有名だが、μTorrentはC++で実装されたWindows用ソフトウェアである。μTorrentはスウェーデン人のLudvig Strigeus氏が開発したが、2006年にBitTorrent, Incに買収されている。μTorrentが登場したとき、備えている機能に比して、GUIアプリにしては(パックされている可能性を勘案しても)きわめて小さいバイナリサイズに感心した。プログラマの力量を誇示するかのようなコンパクトで尖ったソフトウェアだったので、後日の買収のニュースには、落ち着くところに落ち着いたなと感じたのを記憶している。

従来のBitTorrentによるファイル転送がTCPを経由して行われていたのに対し、µTPは、UDP上に構築されたプロトコルを用いる。既にDHTやトラッカーのプロトコルにはUDPが利用されていた(ただしμTorrentによる実装はかなり後になった)が、µTPは、トランスポートプロトコルとしてもUDP上に構築した独自プロトコルを利用する。

BitTorrentクライアントが実装したDHT(分散ハッシュテーブル)は、Kademliaというアルゴリズムの実装で、トラッカーに依存しないリソース探索が可能となった。Gnutellaライクな、しかしより構造化された分散キー管理により、トラッカーがダウンしていても目的のリソースを保持するピアのリストを得られる。ネットワークの単一障害点を無くすという建前だが、違法にアップロードされたデータを配布するトラッカーが検挙されつつあった情勢に対抗するものという見方もあった。

今回のµTP実装は、ネットワークの反応が悪くなってきた場合に、自動的に他のトラフィックに道を譲ることで輻輳を防ぐのが目的とされている。オンラインゲームなどのリアルタイム性が重視されるアプリケーションでは一定のレイテンシ以下にping値が収まっていることが要求されるが、TCPのFIFOキューではサイズの大きいP2Pパケットが転送される場合にジッターが発生しやすくサービスの品質に影響を及ぼす。現在のTCPでは、輻輳制御アルゴリズムとして、CUBIC TCP(Linux)、Compound TCP(Windows)が用いられている。µTPは、BitTorrentのアップロード時に起こりやすい輻輳を特に回避するという目的に適う制御アルゴリズムを、UDP上で実装したものということになる。現在、P2Pアプリケーションを対象としてISPによるトラフィックシェイピングが行われているが、これに対しての回答ともいえるだろう。

前置きは以上で、以下コードを見ていく。libutpは、Windows用のdllとして公開されており、VC9用のソリューションが添付されている。utp.defに公開されているCのAPI関数は以下である。

UTP_Create @1
UTP_SetCallbacks @2
UTP_SetSockopt @3
UTP_Connect @4
UTP_IsIncomingUTP @5
UTP_HandleICMP @6
UTP_Write @7
UTP_RBDrained @8
UTP_CheckTimeouts @9
UTP_GetPeerName @10
UTP_GetDelays @11
UTP_Close @12
inet_ntop @13
inet_pton @14


このうちinet_ntop/inet_ptonは、Windows VistaからWinsockに実装されているので、古いバージョンのWindows用の実装である。

とりあえずは、µTP接続を生成すると思われるUTP_Createを見てみよう。


// Create a UTP socket
UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, const struct sockaddr *addr, socklen_t addrlen)
{
UTPSocket *conn = (UTPSocket*)calloc(1, sizeof(UTPSocket));

g_current_ms = UTP_GetMilliseconds();

UTP_SetCallbacks(conn, NULL, NULL);
conn->our_hist.clear();
conn->their_hist.clear();
conn->rto = 3000;
conn->rtt_var = 800;
conn->seq_nr = 1;
conn->ack_nr = 0;
conn->max_window_user = 255 * PACKET_SIZE;
conn->addr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen);
conn->send_to_proc = send_to_proc;
conn->send_to_userdata = send_to_userdata;
conn->ack_time = g_current_ms + 0x70000000;
conn->last_got_packet = g_current_ms;
conn->last_sent_packet = g_current_ms;
conn->last_measured_delay = g_current_ms + 0x70000000;
conn->last_rwin_decay = int32(g_current_ms) - MAX_WINDOW_DECAY;
conn->last_send_quota = g_current_ms;
conn->send_quota = PACKET_SIZE * 100;
conn->cur_window_packets = 0;
conn->fast_resend_seq_nr = conn->seq_nr;

// default to version 1
UTP_SetSockopt(conn, SO_UTPVERSION, 1);

// we need to fit one packet in the window
// when we start the connection
conn->max_window = conn->get_packet_size();
conn->state = CS_IDLE;

conn->outbuf.mask = 15;
conn->inbuf.mask = 15;

conn->outbuf.elements = (void**)calloc(16, sizeof(void*));
conn->inbuf.elements = (void**)calloc(16, sizeof(void*));

conn->idx = g_utp_sockets.Append(conn);

LOG_UTPV("0x%08x: UTP_Create", conn);

return conn;
}



// The uTP socket layer calls this to send UDP packets
typedef void SendToProc(void *userdata, const byte *p, size_t len, const struct sockaddr *to, socklen_t tolen);

SendToProcは、ユーザー定義のパケット送信関数で、UDPなのでsendtoを呼ぶことになるだろう。対象のソケットは、そのユーザー定義関数からアクセスできるところにあらかじめ開いておかなければならない。コネクションレスのUDPなのでソケットは使い回すことになる。対して、ここで作成しているUTPSocketオブジェクトは仮想的なソケットで、TCP風に対象アドレス毎に作成してg_utp_socketsグローバル配列に追加する。

ライブラリで利用する順序上はUTP_Connectを見るべきだが、単にSYNパケットを作成して送信するだけのAPIなので、ここで次に注目すべきはパケット受信時に呼ぶAPIであるUTP_IsIncomingUTPだ。


// Process a UDP packet from the network. This will process a packet for an existing connection,
// or create a new connection and call incoming_proc. Returns true if the packet was processed
// in some way, false if the packet did not appear to be uTP.
bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc,
SendToProc *send_to_proc, void *send_to_userdata,
const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen);


ネットワークから受け取ったデータをまずこのAPIに読ませ、µTPのパケットかどうかを判定する。


bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc,
SendToProc *send_to_proc, void *send_to_userdata,
const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
{
const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);

if (len < sizeof(PacketFormat) && len < sizeof(PacketFormatV1)) {
LOG_UTPV("recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len);
return false;
}

const PacketFormat* p = (PacketFormat*)buffer;
const PacketFormatV1* p1 = (PacketFormatV1*)buffer;

const byte version = UTP_IsV1(p1);
const uint32 id = (version == 0) ? p->connid : uint32(p1->connid);

if (version == 0 && len < sizeof(PacketFormat)) {
LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(addr, addrbuf), (uint)len, version);
return false;
}


生データbufferPacketFormatにキャストして、パケットの中身を見ていく。


for (size_t i = 0; i < g_utp_sockets.GetCount(); i++) {
UTPSocket *conn = g_utp_sockets[i];
//LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u",
// addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id);
if (conn->addr != addr)
continue;

if (flags == ST_RESET && (conn->conn_id_send == id || conn->conn_id_recv == id)) {
LOG_UTPV("0x%08x: recv RST for existing connection", conn);
if (!conn->userdata || conn->state == CS_FIN_SENT) {
conn->state = CS_DESTROY;
} else {
conn->state = CS_RESET;
}
if (conn->userdata) {
conn->func.on_overhead(conn->userdata, false, len + conn->get_udp_overhead(),
close_overhead);
const int err = conn->state == CS_SYN_SENT ?
ECONNREFUSED :
ECONNRESET;
conn->func.on_error(conn->userdata, err);
}
return true;
} else if (flags != ST_SYN && conn->conn_id_recv == id) {
LOG_UTPV("0x%08x: recv processing", conn);
const size_t read = UTP_ProcessIncoming(conn, buffer, len);
if (conn->userdata) {
conn->func.on_overhead(conn->userdata, false,
(len - read) + conn->get_udp_overhead(),
header_overhead);
}
return true;
}
}


UTPSocketの接続先を比較して、受信パケットが既に管理中のUTPSocketに属する物であれば、ここで処理される。SYNパケット(ST_SYN)でない場合は、送信データの内容を受け取ったことになるので、UTP_ProcessIncoming関数で処理する。その後RSTパケットの処理が続いてから、最後はSYNパケットを受け取って、ピアからの接続要求を処理する。


if (incoming_proc) {
LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(addr, addrbuf), version);

// Create a new UTP socket to handle this new connection
UTPSocket *conn = UTP_Create(send_to_proc, send_to_userdata, to, tolen);
// Need to track this value to be able to detect duplicate CONNECTs
conn->conn_seed = id;
// This is value that identifies this connection for them.
conn->conn_id_send = id;
// This is value that identifies this connection for us.
conn->conn_id_recv = id+1;
conn->ack_nr = seq_nr;
conn->seq_nr = UTP_Random();
conn->fast_resend_seq_nr = conn->seq_nr;

UTP_SetSockopt(conn, SO_UTPVERSION, version);
conn->state = CS_CONNECTED;

const size_t read = UTP_ProcessIncoming(conn, buffer, len, true);

LOG_UTPV("0x%08x: recv send connect ACK", conn);
conn->send_ack(true);

incoming_proc(send_to_userdata, conn);

// we report overhead after incoming_proc, because the callbacks are setup now
if (conn->userdata) {
// SYN
conn->func.on_overhead(conn->userdata, false, (len - read) + conn->get_udp_overhead(),
header_overhead);
// SYNACK
conn->func.on_overhead(conn->userdata, true, conn->get_overhead(),
ack_overhead);
}
}


incoming_procはアプリケーション定義コールバックで、ACK送信後に呼ばれる。

それでは、UTP_ProcessIncoming関数を見てみよう。


// Process an incoming packet
// syn is true if this is the first packet received. It will cut off parsing
// as soon as the header is done
size_t UTP_ProcessIncoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false)
{
UTP_RegisterRecvPacket(conn, len);

g_current_ms = UTP_GetMilliseconds();

conn->update_send_quota();

const PacketFormat *pf = (PacketFormat*)packet;
const PacketFormatV1 *pf1 = (PacketFormatV1*)packet;
const byte *packet_end = packet + len;

// snip

// mark receipt time
uint64 time = UTP_GetMicroseconds();

// snip

uint64 p;

if (conn->version == 0) {
p = uint64(pf->tv_sec) * 1000000 + pf->tv_usec;
} else {
p = pf1->tv_usec;
}

conn->last_measured_delay = g_current_ms;

// get delay in both directions
// record the delay to report back
const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
conn->reply_micro = their_delay;
uint32 prev_delay_base = conn->their_hist.delay_base;
if (their_delay != 0) conn->their_hist.add_sample(their_delay);

// if their new delay base is less than their previous one
// we should shift our delay base in the other direction in order
// to take the clock skew into account
if (prev_delay_base != 0 &&
wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base)) {
// never adjust more than 10 milliseconds
if (prev_delay_base - conn->their_hist.delay_base <= 10000) {
conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base);
}
}

const uint32 actual_delay = conn->version==0
?(pf->reply_micro==INT_MAX?0:(uint)pf->reply_micro)
:(uint)pf1->reply_micro;

assert(conn->our_hist.get_value() >= 0);
// if the actual delay is 0, it means the other end
// hasn't received a sample from us yet, and doesn't
// know what it is. We can't update out history unless
// we have a true measured sample
prev_delay_base = conn->our_hist.delay_base;
if (actual_delay != 0) conn->our_hist.add_sample(actual_delay);
assert(conn->our_hist.get_value() >= 0);

// if our new delay base is less than our previous one
// we should shift the other end's delay base in the other
// direction in order to take the clock skew into account
// This is commented out because it creates bad interactions
// with our adjustment in the other direction. We don't really
// need our estimates of the other peer to be very accurate
// anyway. The problem with shifting here is that we're more
// likely shift it back later because of a low latency. This
// second shift back would cause us to shift our delay base
// which then get's into a death spiral of shifting delay bases
/* if (prev_delay_base != 0 &&
wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
// never adjust more than 10 milliseconds
if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
}
}
*/
// only apply the congestion controller on acks
// if we don't have a delay measurement, there's
// no point in invoking the congestion control
if (actual_delay != 0 && acked_bytes >= 1)
conn->apply_ledbat_ccontrol(acked_bytes, actual_delay, min_rtt);


UTP_GetMicrosecondsは、QueryPerformanceCounterを用いて(monotonicになるように修正した上で)マイクロ秒単位の時間を取得する。それと、パケットに記録されている時間との差分を取っている。

ここでわかるのは、アップロードされた送信パケットが実際に着信するまでにかかった時間(their_delay)をreply_microとして保存し、返信して、それを受け取った側で遅延(actual_delay)として解釈しているということだ。つまり、パケットが行って帰ってくるまでのラウンドトリップタイム(RTT)ではなく、アップロード時の遅延を計測して輻輳を検出しようとしている。この後、apply_ledbat_ccontrol関数によって、場合によってはウィンドウサイズを変化させて他のTCP接続を優先することになる。


void UTPSocket::apply_ledbat_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
{
// the delay can never be greater than the rtt. The min_rtt
// variable is the RTT in microseconds

assert(min_rtt >= 0);
int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt));
assert(our_delay != INT_MAX);
assert(our_delay >= 0);
assert(our_hist.get_value() >= 0);

SOCKADDR_STORAGE sa = addr.get_sockaddr_storage();
UTP_DelaySample((sockaddr*)&sa, our_delay / 1000);

// This test the connection under heavy load from foreground
// traffic. Pretend that our delays are very high to force the
// connection to use sub-packet size window sizes
//our_delay *= 4;

// target is microseconds
int target = CCONTROL_TARGET;
if (target <= 0) target = 100000;

double off_target = target - our_delay;

// this is the same as:
//
// (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
//
// so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
// of the target delay the current delay represents.
// The min() around off_target protects against crazy values of our_delay, which may happen when th
// timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
// of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
// as for large negative numbers, this direction is already capped at the min packet size further down
// the min around the bytes_acked protects against the case where the window size was recently
// shrunk and the number of acked bytes exceeds that. This is considered no more than one full
// window, in order to keep the gain within sane boundries.

assert(bytes_acked > 0);
double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
double delay_factor = off_target / target;
double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;

// since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
// may increase per RTT, we may not increase the window size more than that proportional
// to the number of bytes that were acked, so that once one window has been acked (one rtt)
// the increase limit is not exceeded
// the +1. is to allow for floating point imprecision
assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (int)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked));

if (scaled_gain > 0 && g_current_ms - last_maxed_out_window > 300) {
// if it was more than 300 milliseconds since we tried to send a packet
// and stopped because we hit the max window, we're most likely rate
// limited (which prevents us from ever hitting the window size)
// if this is the case, we cannot let the max_window grow indefinitely
scaled_gain = 0;
}

if (scaled_gain + max_window < MIN_WINDOW_SIZE) {
max_window = MIN_WINDOW_SIZE;
} else {
max_window = (size_t)(max_window + scaled_gain);
}

// make sure that the congestion window is below max
// make sure that we don't shrink our window too small
max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf);


CCONTROL_TARGETは今のところ100 * 1000µs = 100msで、100msを下回っていれば目標のレイテンシが達成できているということになる。ターゲットから外れている数値(off_target)に応じてscaled_gainを算出し、100msよりレイテンシが低ければウィンドウサイズ(max_window)を大きくし、100msを超えていたら輻輳とみなしてウィンドウサイズを小さくし、別のアプリケーションに道を譲るという輻輳制御が行われる。

この後、UTP_ProcessIncomingでは、Selective ACKという処理でパケットロスを判定する。


// count bytes acked by EACK
if (selack_ptr != NULL) {
acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK,
selack_ptr, selack_ptr[-1], min_rtt);
}



void UTPSocket::selective_ack(uint base, const byte *mask, byte len)
{
// snip

bool back_off = false;
int i = 0;
while (nr > 0) {
uint v = resends[--nr];
// don't consider the tail of 0:es to be lost packets
// only unacked packets with acked packets after should
// be considered lost
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);

// this may be an old (re-ordered) packet, and some of the
// packets in here may have been acked already. In which
// case they will not be in the send queue anymore
if (!pkt) continue;

// used in parse_log.py
LOG_UTP("0x%08x: Packet %u lost. Resending", this, v);

// On Loss
back_off = true;
#ifdef _DEBUG
++_stats._rexmit;
#endif
send_packet(pkt);
fast_resend_seq_nr = v + 1;

// Re-send max 4 packets.
if (++i >= 4) break;
}

if (back_off)
maybe_decay_win();

duplicate_ack = count;
}


同じACKを3回受け取った場合、輻輳が激しくなってパケットロスが起きたとみなし、パケット再送を促すとともに、maybe_decay_winを呼んでウィンドウサイズを縮小する。ここはTCP Reno同様である。

µTPの基本的な動作は、以上である。一見する限り、これでμTorrentを起動していてもWebブラウザを含む他のネットワークアプリの動作が重くならなくなるのであれば、万歳といったところだ。ところが、torrentコミュニティの中では、事実上μTorrentが(µTPをサポートする)μTorrentのみをピアとして優先するようになり他のクライアントが差別されかねないことや、µTP適用で絶対的転送速度が下がる可能性があることを危惧して、µTPに対しての批判や慎重論も根強いようである。とはいえ、μTorrentが現在非常に大きなシェアを占めていることは事実であり、µTPを実装するクライアントが増加するのも時間の問題だろう。BitCometXunleiといった中国で人気のあるクライアントが追随すれば、さらに趨勢は決定的となる。

コメント