6. 下面我们看看主线程dispatch_thread的事件处理设置。在memcached.c中的main函数中 /* create unix mode sockets after dropping privileges */ if ( settings . socketpath ! = NULL ) { errno = 0 ; if ( server_socket_unix ( settings . socketpath ,
6. 下面我们看看主线程dispatch_thread的事件处理设置。在memcached.c中的main函数中 /* create unix mode sockets after dropping privileges */
if ( settings . socketpath ! = NULL ) {
errno = 0 ;
if ( server_socket_unix ( settings . socketpath , settings . access ) ) {
vperror ( "failed to listen on UNIX socket: %s" , settings . socketpath ) ;
exit ( EX_OSERR ) ;
}
}
/* create the listening socket, bind it, and init */
if ( settings . socketpath = = NULL ) {
int udp_port ;
const char * portnumber_filename = getenv ( "MEMCACHED_PORT_FILENAME" ) ;
char temp_portnumber_filename [ PATH_MAX ] ;
FILE * portnumber_file = NULL ;
if ( portnumber_filename ! = NULL ) {
snprintf ( temp_portnumber_filename ,
sizeof ( temp_portnumber_filename ) ,
"%s.lck" , portnumber_filename ) ;
portnumber_file = fopen ( temp_portnumber_filename , "a" ) ;
if ( portnumber_file = = NULL ) {
fprintf ( stderr , "Failed to open \"%s\": %s\n" ,
temp_portnumber_filename , strerror ( errno ) ) ;
}
}
errno = 0 ;
if ( settings . port & & server_socket ( settings . port , tcp_transport ,
portnumber_file ) ) {
vperror ( "failed to listen on TCP port %d" , settings . port ) ;
exit ( EX_OSERR ) ;
}
/*
* initialization order: first create the listening sockets
* (may need root on low ports), then drop root if needed,
* then daemonise if needed, then init libevent (in some cases
* descriptors created by libevent wouldn't survive forking).
*/
udp_port = settings . udpport ? settings . udpport : settings . port ;
/* create the UDP listening socket and bind it */
errno = 0 ;
if ( settings . udpport & & server_socket ( settings . udpport , udp_transport ,
portnumber_file ) ) {
vperror ( "failed to listen on UDP port %d" , settings . udpport ) ;
exit ( EX_OSERR ) ;
}
if ( portnumber_file ) {
fclose ( portnumber_file ) ;
rename ( temp_portnumber_filename , portnumber_filename ) ;
}
}
/* Drop privileges no longer needed */
drop_privileges ( ) ;
/* enter the event loop */
event_base_loop ( main_base , 0 ) ; //主线程(dispatcher_thread)的事件监听循环。。。
7. 继续跟踪server_socket函数(memcached.c中)
/**
* Create a socket and bind it to a specific port number
* @param port the port number to bind to
* @param transport the transport protocol (TCP / UDP)
* @param portnumber_file A filepointer to write the port numbers to
* when they are successfully added to the list of ports we
* listen on.
*/
static int server_socket ( int port , enum network_transport transport ,
FILE * portnumber_file ) {
int sfd ;
struct linger ling = { 0 , 0 } ;
struct addrinfo * ai ;
struct addrinfo * next ;
struct addrinfo hints = { . ai_flags = AI_PASSIVE ,
. ai_family = AF_UNSPEC } ;
char port_buf [ NI_MAXSERV ] ;
int error ;
int success = 0 ;
int flags = 1 ;
hints . ai_socktype = IS_UDP ( transport ) ? SOCK_DGRAM : SOCK_STREAM ;
if ( port = = - 1 ) {
port = 0 ;
}
snprintf ( port_buf , sizeof ( port_buf ) , "%d" , port ) ;
error = getaddrinfo ( settings . inter , port_buf , & hints , & ai ) ;
if ( error ! = 0 ) {
if ( error ! = EAI_SYSTEM )
fprintf ( stderr , "getaddrinfo(): %s\n" , gai_strerror ( error ) ) ;
else
perror ( "getaddrinfo()" ) ;
return 1 ;
}
for ( next = ai ; next ; next = next - > ai_next ) {
conn * listen_conn_add ;
if ( ( sfd = new_socket ( next ) ) = = - 1 ) { //creat socket
/* getaddrinfo can return "junk" addresses,
* we make sure at least one works before erroring.
*/
continue ;
}
# ifdef IPV6_V6ONLY
if ( next - > ai_family = = AF_INET6 ) {
error = setsockopt ( sfd , IPPROTO_IPV6 , IPV6_V6ONLY , ( char * ) & flags , sizeof ( flags ) ) ;
if ( error ! = 0 ) {
perror ( "setsockopt" ) ;
close ( sfd ) ;
continue ;
}
}
# endif
// setsockopt
setsockopt ( sfd , SOL_SOCKET , SO_REUSEADDR , ( void * ) & flags , sizeof ( flags ) ) ;
if ( IS_UDP ( transport ) ) {
maximize_sndbuf ( sfd ) ;
} else {
error = setsockopt ( sfd , SOL_SOCKET , SO_KEEPALIVE , ( void * ) & flags , sizeof ( flags ) ) ;
if ( error ! = 0 )
perror ( "setsockopt" ) ;
error = setsockopt ( sfd , SOL_SOCKET , SO_LINGER , ( void * ) & ling , sizeof ( ling ) ) ;
if ( error ! = 0 )
perror ( "setsockopt" ) ;
error = setsockopt ( sfd , IPPROTO_TCP , TCP_NODELAY , ( void * ) & flags , sizeof ( flags ) ) ;
if ( error ! = 0 )
perror ( "setsockopt" ) ;
}
//bind
if ( bind ( sfd , next - > ai_addr , next - > ai_addrlen ) = = - 1 ) {
if ( errno ! = EADDRINUSE ) {
perror ( "bind()" ) ;
close ( sfd ) ;
freeaddrinfo ( ai ) ;
return 1 ;
}
close ( sfd ) ;
continue ;
} else {
success + + ;
//TCP listen
if ( ! IS_UDP ( transport ) & & listen ( sfd , settings . backlog ) = = - 1 ) {
perror ( "listen()" ) ;
close ( sfd ) ;
freeaddrinfo ( ai ) ;
return 1 ;
}
if ( portnumber_file ! = NULL & &
( next - > ai_addr - > sa_family = = AF_INET | |
next - > ai_addr - > sa_family = = AF_INET6 ) ) {
union {
struct sockaddr_in in ;
struct sockaddr_in6 in6 ;
} my_sockaddr ;
socklen_t len = sizeof ( my_sockaddr ) ;
if ( getsockname ( sfd , ( struct sockaddr * ) & my_sockaddr , & len ) = = 0 ) {
if ( next - > ai_addr - > sa_family = = AF_INET ) {
fprintf ( portnumber_file , "%s INET: %u\n" ,
IS_UDP ( transport ) ? "UDP" : "TCP" ,
ntohs ( my_sockaddr . in . sin_port ) ) ;
} else {
fprintf ( portnumber_file , "%s INET6: %u\n" ,
IS_UDP ( transport ) ? "UDP" : "TCP" ,
ntohs ( my_sockaddr . in6 . sin6_port ) ) ;
}
}
}
}
if ( IS_UDP ( transport ) ) {
//UDP 的处理中不需要accept,所以直接派发connection到工作线程。
int c ;
for ( c = 0 ; c settings . num_threads ; c + + ) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new ( sfd , conn_read , EV_READ | EV_PERSIST ,
UDP_READ_BUFFER_SIZE , transport ) ;
}
} else {
//TCP的处理(注意,这里dispatcher_thread同样调用了conn_new来绑定conn_event到其main_base. 并且此时conn的初始状态为conn_listening, 事件为持久可读, 而在conn_new中注册了conn_event的回调函数为event_handler,所以,dispatche_thread在当前listen的socket可读时就会调用event_handler,进而调用driver_machine(c) 进入状态机。而在driver_machine中如果是主线程(dispatcher_thread)则会在accept socket后调用dispatch_new_conn函数来给各worker_thread派发connection...)
if ( ! ( listen_conn_add = conn_new ( sfd , conn_listening ,
EV_READ | EV_PERSIST , 1 ,
transport , main_base ) ) ) {
fprintf ( stderr , "failed to create listening connection\n" ) ;
exit ( EXIT_FAILURE ) ;
}
listen_conn_add - > next = listen_conn ;
listen_conn = listen_conn_add ;
}
}
freeaddrinfo ( ai ) ;
/* Return zero iff we detected no errors in starting up connections */
return success = = 0 ;
}
8. 看看UDP和TCP模式下dispatcher_thread都会调用的dispatch_new_conn函数 ( 在thread.c中)
/*
* Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because
* of an incoming connection.
*/
void dispatch_conn_new ( int sfd , enum conn_states init_state , int event_flags ,
int read_buffer_size , enum network_transport transport ) {
CQ_ITEM * item = cqi_new ( ) ;
int tid = ( last_thread + 1 ) % settings . num_threads ; //轮询的方式找worker_thread
LIBEVENT_THREAD * thread = threads + tid ;
last_thread = tid ;
item - > sfd = sfd ;
item - > init_state = init_state ;
item - > event_flags = event_flags ;
item - > read_buffer_size = read_buffer_size ;
item - > transport = transport ;
//push conn到worker_thread的CQ中
cq_push ( thread - > new_conn_queue , item ) ;
MEMCACHED_CONN_DISPATCH ( sfd , thread - > thread_id ) ;
if ( write ( thread - > notify_send_fd , "" , 1 ) ! = 1 ) {
perror ( "Writing to thread notify pipe" ) ;
}
}
9. 先看看unix domain socket模式下主线程的事件处理设置。(在上面的6中调用的 server_socket_unix函数 )
static int server_socket_unix ( const char * path , int access_mask ) {
int sfd ;
struct linger ling = { 0 , 0 } ;
struct sockaddr_un addr ;
struct stat tstat ;
int flags = 1 ;
int old_umask ;
if ( ! path ) {
return 1 ;
}
if ( ( sfd = new_socket_unix ( ) ) = = - 1 ) {
return 1 ;
}
/*
* Clean up a previous socket file if we left it around
*/
if ( lstat ( path , & tstat ) = = 0 ) {
if ( S_ISSOCK ( tstat . st_mode ) )
unlink ( path ) ;
}
setsockopt ( sfd , SOL_SOCKET , SO_REUSEADDR , ( void * ) & flags , sizeof ( flags ) ) ;
setsockopt ( sfd , SOL_SOCKET , SO_KEEPALIVE , ( void * ) & flags , sizeof ( flags ) ) ;
setsockopt ( sfd , SOL_SOCKET , SO_LINGER , ( void * ) & ling , sizeof ( ling ) ) ;
/*
* the memset call clears nonstandard fields in some impementations
* that otherwise mess things up.
*/
memset ( & addr , 0 , sizeof ( addr ) ) ;
addr . sun_family = AF_UNIX ;
strncpy ( addr . sun_path , path , sizeof ( addr . sun_path ) - 1 ) ;
assert ( strcmp ( addr . sun_path , path ) = = 0 ) ;
old_umask = umask ( ~ ( access_mask & 0777 ) ) ;
if ( bind ( sfd , ( struct sockaddr * ) & addr , sizeof ( addr ) ) = = - 1 ) {
perror ( "bind()" ) ;
close ( sfd ) ;
umask ( old_umask ) ;
return 1 ;
}
umask ( old_umask ) ;
if ( listen ( sfd , settings . backlog ) = = - 1 ) {
perror ( "listen()" ) ;
close ( sfd ) ;
return 1 ;
}
if ( ! ( listen_conn = conn_new ( sfd , conn_listening , //同样是调用conn_new
EV_READ | EV_PERSIST , 1 ,
local_transport , main_base ) ) ) {
fprintf ( stderr , "failed to create listening connection\n" ) ;
exit ( EXIT_FAILURE ) ;
}
return 0 ;
}
查看更多关于memcached探索之threadmodel(2)的详细内容...