Back to the Résumé

/* p2p_main.c: Jay Miller, 11-2001

 * Contains all primary functions for standard use of the p2p library.

 */

#include "p2p.h"

/*** globals - mostly needed to handle the signal-driven i/o *************/

static dgram dg[MAX_QUEUE];   /* datagram queue */
static long dg_counter[MAX_QUEUE + 1];   /* diagnostics for queue usage */
static BOOL p2p_quit = FALSE;
static pthread_t tid;  /* save the thread id so we can kill it later */

static int fd_server;  /* keep descriptors around */
static int fd_client;

static int queue_get;  /* these must be available to i/o signal handler */
static int queue_put;
static int queue_count;

static void p2p_sig_io(int);
static void p2p_sig_hup(int);
static void *p2p_main_loop(void *);

/*** public functions ****************************************************/

/* Library init.  Calls two private functions to startup the client and

 * server halves.  Also sets the key to be used for encryption.

 *    key : a 32-bit encryption key passed in by the caller.  This key

 *    must be the same for all p2p instances with which the current

 *    instance wishes to communicate.  If key == 0, no encryption will

 *    occur.

 *

 * Returns 0 on success or -1 on error.

 */
int p2p_init(crypt_t key)
{
   if (p2p_server_main() == -1) {
      p2p_error(WARN, __LINE__, "Error in server_main().\n");
      return -1;
   }

   if (p2p_client_main() == -1) {
      p2p_error(WARN, __LINE__, "Error in client_main().\n");
      return -1;
   }

   /* set encryption key */
   p2p_setkey(key);

   return 0;
}

/* Packet sending function.  This is the one a user will call to send

 * data to another p2p library.

 *    dest : destination of the remote p2p library in either

 *           dotted-decimal or a full hostname format.

 *    buf  : arbitrary data to send.

 *    len  : length of buf in bytes.

 * 

 * Returns 0 on success or -1 on error.

 */
int p2p_send(char *dest, char *buf, size_t length)
{
   char *buf_padded;
   int ret = 0;
   struct hostent *hptr;
   struct sockaddr_in servaddr;

   bzero(&servaddr, sizeof(servaddr));
   servaddr.sin_family = AF_INET;
   servaddr.sin_port = htons(RECV_PORT);

   if ((hptr = gethostbyname(dest)) == NULL) {
      p2p_error(SYS, __LINE__, "Error in gethostbyname()");
      return -1;
   }
   memcpy(&servaddr.sin_addr, hptr->h_addr, sizeof(struct in_addr));

   /* If encryption is enabled, encrypt that buffer for sending. */
   if (p2p_iskeyset()) {
      if ((buf_padded = p2p_encrypt(buf, &length)) == NULL)
         return -1;
   }
   
   if ((ret = sendto(fd_client, buf_padded, length, 0, &servaddr,
         sizeof(servaddr))) == -1)
      p2p_error(SYS, __LINE__, "Error in sendto()");
   
   /* If we encrypted the datagram, free the padded buffer (if any). */
   if (p2p_iskeyset())
      p2p_padding_free(buf_padded);

   return ret;
}

/* Library cleanup.

 */
void p2p_stop(void)
{
   int i;

   /* Code to release the thread.  If p2p_quit is true, any signal will

    * cause the main server loop to break.  The SIGIO signal handler is

    * also prepared to handle a signal with no data on the socket. */
   p2p_quit = TRUE;
   if (pthread_kill(tid, SIGIO) != 0)
      p2p_error(WARN, __LINE__, "kill:");  
      
   for (i = 0; i < MAX_QUEUE; i++) {   /* free queue of buffers */
      free(dg[i].dg);
      free(dg[i].sa);
   }

   close(fd_client);
   close(fd_server);
}

/*** initialization functions ********************************************/ 

/* Initialize the server half.  We open the server-side socket, bind it

 * to an address, and spawn a thread so the calling program can resume.

 *

 * Returns 0 on success or a non-zero error code on error.

 */
int p2p_server_main(void)
{
   int i, ret = 0;
   struct sockaddr_in saddr;

   /* start up a standard server-side UDP socket */
   if ((fd_server = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
      p2p_error(SYS, __LINE__, "Error in socket()");
      return fd_server;
   }

   bzero(&saddr, sizeof(saddr));
   saddr.sin_family = AF_INET;
   saddr.sin_addr.s_addr = htons(INADDR_ANY);
   saddr.sin_port = htons(RECV_PORT);

   if ((ret = bind(fd_server, &saddr, sizeof(saddr))) == -1) {
      p2p_error(SYS, __LINE__, "Error in bind()");
      return ret;
   }

   /* initialize the buffer queue and queue counters */
   for (i = 0; i < MAX_QUEUE; i++) {
      dg[i].dg = malloc(MAX_DGRAM);
      dg[i].sa = malloc((size_t)sizeof(saddr));
      dg[i].sa_len = (size_t)sizeof(saddr);
   }
   queue_get = queue_put = queue_count = 0;
   
   if (((ret = pthread_create(
         &tid, NULL, &p2p_main_loop, 0))) != 0)
      p2p_error(SYS, __LINE__, "Error in pthread_create().\n");

   return ret;
}

/* Initialize the client half.  Open the client-side socket and set it

 * to allow broadcasting.

 *

 * Returns 0 on success or -1 on error.

 */
int p2p_client_main(void)
{
   int ret = 0;
   const int on = 1;

   /* start up a UDP socket for the client */
   if ((fd_client = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
      p2p_error(SYS, __LINE__, "Error in socket()");
      return fd_client;
   }
   
   /* turn on the socket's ability to broadcast on this network. */
   if ((ret = setsockopt(fd_client, SOL_SOCKET, SO_BROADCAST, &on,
         sizeof(on))) == -1) {
      p2p_error(SYS, __LINE__, "Error in setsockopt()");
      return ret;
   }

   return ret;
}

/*** main server loop ****************************************************/

static void *p2p_main_loop(void *null)
{
   sigset_t noblock_mask, block_mask, unblock_mask;

   pthread_detach(pthread_self());

   signal(SIGHUP, p2p_sig_hup);  /* register signal handlers */
   signal(SIGIO, p2p_sig_io);
   
   /* let the kernel know who owns this socket */
   fcntl(fd_server, F_SETOWN, getpid());

   /* Setup socket for signal-driven and non-blocking i/o.  It must be

    * non-blocking because POSIX signals do not queue. */
   fcntl(fd_server, F_SETFL, O_ASYNC | O_NONBLOCK);

   sigemptyset(&noblock_mask);       /* init our three signal sets */
   sigemptyset(&block_mask);    /* so that we may block SIGIO */
   sigemptyset(&unblock_mask);
   
   /* We must be able to block SIGIO because queue_count is modified

    * both in this server loop and in the signal handler itself.  There

    * is a possible race condition if we do not restrict the handler. */
   sigaddset(&block_mask, SIGIO);   /* the signal we want to block */
   sigprocmask(SIG_BLOCK, &block_mask, &unblock_mask);   /* block SIGIO */

   for (;;) {  /* the main server loop */
      while (queue_count == 0 && p2p_quit == FALSE)
         sigsuspend(&noblock_mask);  /* return after any signal */
      
      sigprocmask(SIG_SETMASK, &unblock_mask, NULL);  /* unblock SIGIO */

      if (p2p_quit)
         break;

      /* decrypt the new datagram, retrieve the length of the plaintext */
      if (p2p_iskeyset()) {
         dg[queue_get].dg_len =
               p2p_decrypt(dg[queue_get].dg, dg[queue_get].dg_len);
      }

      p2p_recv_handler(dg[queue_get].dg, dg[queue_get].dg_len,
            (struct sockaddr_in *)dg[queue_get].sa, dg[queue_get].sa_len);

      if (++queue_get >= MAX_QUEUE)
         queue_get = 0;  /* it's a circular queue */

      sigprocmask(SIG_BLOCK, &block_mask, &unblock_mask); /* block SIGIO */
      queue_count--;
   }
   return NULL;
}

/*** signal related functions ********************************************/

/* Signal handler for SIGIO.

 *

 * For each datagram delivered to our socket, the kernel posts a SIGIO

 * to our application's PID.  This function immediately queues the

 * datagrams so they may later be handled by the application.

 */
static void p2p_sig_io(int signo)
{
   ssize_t len;
   int num_read = 0;

   /* Because POSIX signals don't queue past 1, we must read the socket

    * until all data has been removed from it on each SIGIO.  We break

    * when our non-blocking socket receives an EWOULDBLOCK. */
   while (1) {
      if (queue_count >= MAX_QUEUE)
         p2p_error(WARN, __LINE__, "Queue size exceeds maximum!\n");

      /* read the datagram from the socket into the queue */
      len = recvfrom(fd_server, dg[queue_put].dg, MAX_DGRAM, 0,
            dg[queue_put].sa, &dg[queue_put].sa_len);
      
      if (len < 0) {
         if (errno == EWOULDBLOCK) {
            /* this error implies there is no further data on the socket */
            break;
         } else {
            /* this error implies something is wrong */
            p2p_error(SYS, __LINE__, "Error in recvfrom()");
         }
      }
      dg[queue_put].dg_len = len;

      num_read++;  /* diagnostics */
      queue_count++;
      if (++queue_put >= MAX_QUEUE)
         queue_put = 0;  /* it's a circular queue */
   }

   dg_counter[num_read]++;  /* diagnostics */  
   return;
}

/* Diagnostics function.  This function returns a histogram of the

 * number of times the SIGIO while loop is executed on each SIGIO

 * signal.  This is mostly here to tell us if we need a bigger queue.

 *

 * It also resets the counter after each SIGHUP.

 */
static void p2p_sig_hup(int signo)
{
   int i;

   for (i = 0; i <= MAX_QUEUE; i++) {
      printf("dg_counter[%d] = %ld\n", i, dg_counter[i]);
      dg_counter[i] = 0;
   }
   return;
}

/*** utility functions ***************************************************/

/* Error handling for entire library.

 *   level : Some combination of WARM, SYS (for syscalls) or FATAL.

 *   line  : The line number of the error.

 *   fmt   : printf-style format string describing the error.

 */
void p2p_error(int level, int line, char *fmt, ...)
{
   int errno_saved;
   char buf[MAX_BUFFER];
   va_list va;

   va_start(va, fmt);
#ifdef HAVE_VSNPRINTF
   vsnprintf(buf, sizeof(buf), fmt, va);
#else

   vsprintf(buf, fmt, va);  /* not safe */
#endif

   va_end(va);
   if (level & SYS) {  /* error on a system call */
      errno_saved = errno;
      snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ": %s",
            strerror(errno_saved));
   }
   fprintf(stderr, "%s error at line %d.\n%s.",
         (level & FATAL) ? "Fatal" : "Non-fatal", line, buf);
   if (level & FATAL)
      exit(1);
   return;
}

/*************************************************************************/



Back to the Résumé