[cig-commits] r8350 - in cs/portal/trunk/hello: . hello hello/templates/hello

leif at geodynamics.org leif at geodynamics.org
Wed Nov 28 17:27:39 PST 2007


Author: leif
Date: 2007-11-28 17:27:38 -0800 (Wed, 28 Nov 2007)
New Revision: 8350

Added:
   cs/portal/trunk/hello/daemon.c
   cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt
Modified:
   cs/portal/trunk/hello/hello/urls.py
Log:
Wrote another simple daemon for the "hello world" portal.  For fun, I
wrote this one in plain old C.  The HW portal now exports "list.txt"
(in addition to "list.py") to facilitate parsing using fscanf().

Currently, each simulation run by this daemon is a no-op: it simply
marches each simulation through the various states -- POSTing the
status to the portal -- without actually executing any Globus
commands.


Added: cs/portal/trunk/hello/daemon.c
===================================================================
--- cs/portal/trunk/hello/daemon.c	2007-11-29 00:15:05 UTC (rev 8349)
+++ cs/portal/trunk/hello/daemon.c	2007-11-29 01:27:38 UTC (rev 8350)
@@ -0,0 +1,356 @@
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+
+#ifndef SLEEP_INTERVAL
+#define SLEEP_INTERVAL 1
+#endif
+
+#ifndef PORTAL_HOSTNAME
+#define PORTAL_HOSTNAME "localhost"
+#endif
+
+#ifndef PORTAL_HTTP_PORT
+#define PORTAL_HTTP_PORT 8000
+#endif
+
+#ifndef PORTAL_URL_ROOT
+#define PORTAL_URL_ROOT "/hello"
+#endif
+
+
+typedef enum SimStatus {
+    SIM_STATUS_NEW, /* pseudo status code -- "NEW" is never actually reported by Globus */
+    SIM_STATUS_PENDING,
+    SIM_STATUS_ACTIVE,
+    SIM_STATUS_DONE,
+    SIM_STATUS_FAILED
+} SimStatus;
+
+typedef struct Simulation {
+    struct Simulation *next;
+    unsigned int id; /* ID in server database */
+    SimStatus status;
+    unsigned int nodes; /* number of MPI processes */
+    char *globusJobId; /* passed to 'globus-job-status' */
+    pid_t gassServer; /* child process ID of GASS server */
+    unsigned int sleepTimer, stagnancy;
+} Simulation;
+
+
+/*------------------------------------------------------------------------
+  HTTP utility routines
+  ------------------------------------------------------------------------*/
+
+
+int connect_to_portal(struct sockaddr_in *serv_addr, FILE **sockin, FILE **sockout)
+{
+    int sockfd, newsockfd, ret;
+    
+    sockfd = socket(AF_INET, SOCK_STREAM, 0);
+    if (sockfd == -1) {
+        perror("socket");
+        return -1;
+    }
+    ret = connect(sockfd, (struct sockaddr *)serv_addr, sizeof(*serv_addr));
+    if (ret == -1) {
+        perror("connect");
+        return -1;
+    }
+    
+    *sockin = fdopen(sockfd, "r");
+    if (!*sockin) {
+        perror("fdopen");
+        return -1;
+    }
+    
+    newsockfd = dup(sockfd);
+    if (newsockfd == -1) {
+        perror("dup");
+        return -1;
+    }
+    *sockout = fdopen(newsockfd, "w");
+    if (!*sockout) {
+        perror("fdopen");
+        return -1;
+    }
+    
+    return 0;
+}
+
+
+int get(const char *path, FILE *sockout)
+{
+    int ret;
+    
+    ret = fprintf(sockout,
+                  "GET %s HTTP/1.0\r\n"
+                  "\r\n",
+                  path);
+    if (ret < 0) {
+        return -1;
+    }
+    
+    ret = fflush(sockout);
+    if (ret != 0) {
+        return -1;
+    }
+    
+    return 0;
+}
+
+
+int post(const char *content, size_t content_length, const char *path, FILE *sockout)
+{
+    int ret;
+    
+    ret = fprintf(sockout,
+                  "POST %s HTTP/1.0\r\n"
+                  "Content-type: application/x-www-form-urlencoded\r\n"
+                  "Content-length: %lu\r\n"
+                  "\r\n"
+                  "%s",
+                  path,
+                  (unsigned long)content_length,
+                  content);
+    if (ret < 0) {
+        return -1;
+    }
+    
+    ret = fflush(sockout);
+    if (ret != 0) {
+        return -1;
+    }
+    
+    return 0;
+}
+
+
+void fpostf(const char *path, FILE *sockout, char *format, ...)
+{
+    va_list ap;
+    char *content;
+    size_t content_length;
+    FILE *s;
+    
+    /* write content to a temporary file to determine its length */
+    s = tmpfile();
+    va_start(ap, format);
+    vfprintf(s, format, ap);
+    content_length = (size_t)ftell(s);
+    va_end(ap);
+    
+    /* allocate and initialize buffer */
+    content = (char *)malloc(content_length);
+    rewind(s);
+    fread(content, 1, content_length, s);
+    fclose(s);
+    
+    /* post */
+    post(content, content_length, path, sockout);
+    
+    free(content);
+}
+
+
+void skip_http_headers(FILE *sockin)
+{
+    char line[80];
+    
+    while (fgets(line, sizeof(line), sockin)) {
+        if (strcmp(line, "\r\n") == 0) {
+            break;
+        }
+    }
+}
+
+
+/*------------------------------------------------------------------------
+  simulation monitoring
+  ------------------------------------------------------------------------*/
+
+
+const char *statusStr(SimStatus status)
+{
+    /* map status codes to strings displayed to the user in the portal */
+    switch (status) {
+    case SIM_STATUS_NEW: return "new";
+    case SIM_STATUS_PENDING: return "pending";
+    case SIM_STATUS_ACTIVE: return "active";
+    case SIM_STATUS_DONE: return "done";
+    case SIM_STATUS_FAILED: return "failed";
+    }
+    assert(0);
+    return "xxx";
+}
+
+
+void post_status(Simulation *sim, struct sockaddr_in *serv_addr)
+{
+    FILE *sockin, *sockout;
+    const char *status;
+    char path[80], line[80];
+    
+    if (-1 == connect_to_portal(serv_addr, &sockin, &sockout)) {
+        return;
+    }
+    
+    status = statusStr(sim->status);
+    sprintf(path, PORTAL_URL_ROOT "/simulations/%u/status/", sim->id);
+    printf("POST %s status=%s\n", path, status);
+    fpostf(path, sockout, "status=%s", status);
+    
+    skip_http_headers(sockin);
+    if (fgets(line, sizeof(line), sockin)) {
+        printf("response: %s\n", line);
+    }
+    
+    fclose(sockin);
+    fclose(sockout);
+}
+
+
+void monitor_simulation(Simulation *sim, struct sockaddr_in *serv_addr)
+{
+    switch (sim->status) {
+    case SIM_STATUS_NEW:
+        break;
+    case SIM_STATUS_PENDING:
+        sim->status = SIM_STATUS_ACTIVE;
+        post_status(sim, serv_addr);
+        break;
+    case SIM_STATUS_ACTIVE:
+        sim->status = SIM_STATUS_DONE;
+        post_status(sim, serv_addr);
+        break;
+    case SIM_STATUS_DONE:
+    case SIM_STATUS_FAILED:
+        break;
+    }
+}
+
+
+/*------------------------------------------------------------------------
+  main processing loop
+  ------------------------------------------------------------------------*/
+
+
+void scan_simulation_list(FILE *sockin, Simulation **simList)
+{
+    unsigned int id; char status[32]; unsigned int nodes;
+    Simulation *sim;
+    
+    skip_http_headers(sockin);
+    while (fscanf(sockin, "%u %31s %u\n", &id, status, &nodes) == 3) {
+        if (strcmp(status, "new") == 0) {
+            /* new simulation */
+            sim = (Simulation *)malloc(sizeof(Simulation));
+            sim->next = *simList;
+            sim->id = id;
+            sim->status = SIM_STATUS_NEW;
+            sim->nodes = nodes;
+            sim->globusJobId = NULL;
+            sim->gassServer = (pid_t)-1;
+            sim->sleepTimer = 0;
+            sim->stagnancy = 0;
+            
+            *simList = sim;
+        }
+    }
+}
+
+
+void reap_children(Simulation *simList)
+{
+}
+
+
+void watch_for_simulations(struct sockaddr_in *serv_addr)
+{
+    FILE *sockin, *sockout;
+    Simulation *sim = NULL, *simList = NULL;
+    int ret;
+    
+    while (1) {
+        
+        reap_children(simList);
+        
+        ret = connect_to_portal(serv_addr, &sockin, &sockout);
+        if (ret != -1) {
+            ret = get(PORTAL_URL_ROOT "/simulations/list.txt", sockout);
+            if (ret != -1) {
+                scan_simulation_list(sockin, &simList);
+            }
+        }
+        
+        fclose(sockin);
+        fclose(sockout);
+        
+        /* start new simulations */
+        for (sim = simList; sim && sim->status == SIM_STATUS_NEW; sim = sim->next) {
+            /*
+             * Tell the server we've processed this simulation.  This
+             * also prevents us from creating duplicate Simulation
+             * records on the next iteration of this loop!
+             */
+            sim->status = SIM_STATUS_PENDING;
+            post_status(sim, serv_addr);
+            
+            /* Wait two ticks before checking this Simulation again. */
+            sim->stagnancy = 2;
+        }
+        
+        /* monitor old simulations */
+        for (sim = simList; sim; sim = sim->next) {
+            if (sim->sleepTimer > sim->stagnancy) {
+                monitor_simulation(sim, serv_addr);
+                sim->sleepTimer = 0;
+            } else {
+                ++sim->sleepTimer;
+            }
+        }
+        
+        sleep(SLEEP_INTERVAL);
+    }
+    
+    return;
+}
+
+
+/*------------------------------------------------------------------------
+  main program
+  ------------------------------------------------------------------------*/
+
+
+int main()
+{
+    struct hostent *he;
+    struct sockaddr_in serv_addr;
+    
+    he = gethostbyname(PORTAL_HOSTNAME);
+    if (!he) {
+        herror("gethostbyname");
+    }
+    memset(&serv_addr, 0, sizeof(serv_addr));
+    serv_addr.sin_family = AF_INET;
+    serv_addr.sin_port = htons(PORTAL_HTTP_PORT);
+    serv_addr.sin_addr = **((struct in_addr **)he->h_addr_list);
+    
+    watch_for_simulations(&serv_addr);
+    return 0;
+}
+
+
+/* end of file */

Added: cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt
===================================================================
--- cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt	2007-11-29 00:15:05 UTC (rev 8349)
+++ cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt	2007-11-29 01:27:38 UTC (rev 8350)
@@ -0,0 +1,2 @@
+{% for object in object_list %}
+{{ object.id }} {{ object.status }} {{ object.nodes }}{% endfor %}

Modified: cs/portal/trunk/hello/hello/urls.py
===================================================================
--- cs/portal/trunk/hello/hello/urls.py	2007-11-29 00:15:05 UTC (rev 8349)
+++ cs/portal/trunk/hello/hello/urls.py	2007-11-29 01:27:38 UTC (rev 8350)
@@ -39,6 +39,11 @@
                                                                                     allow_empty = True,
                                                                                     template_name = APP_LABEL + '/simulation_list.py',
                                                                                     mimetype = 'text/plain')),
+    (r'^simulations/list.txt$', 'django.views.generic.list_detail.object_list', dict(queryset = models.Simulation.objects.all(),
+                                                                                     allow_empty = True,
+                                                                                     template_name = APP_LABEL + '/simulation_list.txt',
+                                                                                     mimetype = 'text/plain')),
+
     # input files
 
     (r'^simulations/(?P<object_id>\d+)/parameters.txt$', view('parameters_txt')),



More information about the cig-commits mailing list