[cig-commits] r8350 - in cs/portal/trunk/hello: . hello
hello/templates/hello
leif at geodynamics.org
leif at geodynamics.org
Wed Nov 28 17:27:39 PST 2007
Author: leif
Date: 2007-11-28 17:27:38 -0800 (Wed, 28 Nov 2007)
New Revision: 8350
Added:
cs/portal/trunk/hello/daemon.c
cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt
Modified:
cs/portal/trunk/hello/hello/urls.py
Log:
Wrote another simple daemon for the "hello world" portal. For fun, I
wrote this one in plain old C. The HW portal now exports "list.txt"
(in addition to "list.py") to facilitate parsing using fscanf().
Currently, each simulation run by this daemon is a no-op: it simply
marches each simulation through the various states -- POSTing the
status to the portal -- without actually executing any Globus
commands.
Added: cs/portal/trunk/hello/daemon.c
===================================================================
--- cs/portal/trunk/hello/daemon.c 2007-11-29 00:15:05 UTC (rev 8349)
+++ cs/portal/trunk/hello/daemon.c 2007-11-29 01:27:38 UTC (rev 8350)
@@ -0,0 +1,356 @@
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+
+#ifndef SLEEP_INTERVAL
+#define SLEEP_INTERVAL 1
+#endif
+
+#ifndef PORTAL_HOSTNAME
+#define PORTAL_HOSTNAME "localhost"
+#endif
+
+#ifndef PORTAL_HTTP_PORT
+#define PORTAL_HTTP_PORT 8000
+#endif
+
+#ifndef PORTAL_URL_ROOT
+#define PORTAL_URL_ROOT "/hello"
+#endif
+
+
+typedef enum SimStatus {
+ SIM_STATUS_NEW, /* pseudo status code -- "NEW" is never actually reported by Globus */
+ SIM_STATUS_PENDING,
+ SIM_STATUS_ACTIVE,
+ SIM_STATUS_DONE,
+ SIM_STATUS_FAILED
+} SimStatus;
+
+typedef struct Simulation {
+ struct Simulation *next;
+ unsigned int id; /* ID in server database */
+ SimStatus status;
+ unsigned int nodes; /* number of MPI processes */
+ char *globusJobId; /* passed to 'globus-job-status' */
+ pid_t gassServer; /* child process ID of GASS server */
+ unsigned int sleepTimer, stagnancy;
+} Simulation;
+
+
+/*------------------------------------------------------------------------
+ HTTP utility routines
+ ------------------------------------------------------------------------*/
+
+
+int connect_to_portal(struct sockaddr_in *serv_addr, FILE **sockin, FILE **sockout)
+{
+ int sockfd, newsockfd, ret;
+
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sockfd == -1) {
+ perror("socket");
+ return -1;
+ }
+ ret = connect(sockfd, (struct sockaddr *)serv_addr, sizeof(*serv_addr));
+ if (ret == -1) {
+ perror("connect");
+ return -1;
+ }
+
+ *sockin = fdopen(sockfd, "r");
+ if (!*sockin) {
+ perror("fdopen");
+ return -1;
+ }
+
+ newsockfd = dup(sockfd);
+ if (newsockfd == -1) {
+ perror("dup");
+ return -1;
+ }
+ *sockout = fdopen(newsockfd, "w");
+ if (!*sockout) {
+ perror("fdopen");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int get(const char *path, FILE *sockout)
+{
+ int ret;
+
+ ret = fprintf(sockout,
+ "GET %s HTTP/1.0\r\n"
+ "\r\n",
+ path);
+ if (ret < 0) {
+ return -1;
+ }
+
+ ret = fflush(sockout);
+ if (ret != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int post(const char *content, size_t content_length, const char *path, FILE *sockout)
+{
+ int ret;
+
+ ret = fprintf(sockout,
+ "POST %s HTTP/1.0\r\n"
+ "Content-type: application/x-www-form-urlencoded\r\n"
+ "Content-length: %lu\r\n"
+ "\r\n"
+ "%s",
+ path,
+ (unsigned long)content_length,
+ content);
+ if (ret < 0) {
+ return -1;
+ }
+
+ ret = fflush(sockout);
+ if (ret != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+void fpostf(const char *path, FILE *sockout, char *format, ...)
+{
+ va_list ap;
+ char *content;
+ size_t content_length;
+ FILE *s;
+
+ /* write content to a temporary file to determine its length */
+ s = tmpfile();
+ va_start(ap, format);
+ vfprintf(s, format, ap);
+ content_length = (size_t)ftell(s);
+ va_end(ap);
+
+ /* allocate and initialize buffer */
+ content = (char *)malloc(content_length);
+ rewind(s);
+ fread(content, 1, content_length, s);
+ fclose(s);
+
+ /* post */
+ post(content, content_length, path, sockout);
+
+ free(content);
+}
+
+
+void skip_http_headers(FILE *sockin)
+{
+ char line[80];
+
+ while (fgets(line, sizeof(line), sockin)) {
+ if (strcmp(line, "\r\n") == 0) {
+ break;
+ }
+ }
+}
+
+
+/*------------------------------------------------------------------------
+ simulation monitoring
+ ------------------------------------------------------------------------*/
+
+
+const char *statusStr(SimStatus status)
+{
+ /* map status codes to strings displayed to the user in the portal */
+ switch (status) {
+ case SIM_STATUS_NEW: return "new";
+ case SIM_STATUS_PENDING: return "pending";
+ case SIM_STATUS_ACTIVE: return "active";
+ case SIM_STATUS_DONE: return "done";
+ case SIM_STATUS_FAILED: return "failed";
+ }
+ assert(0);
+ return "xxx";
+}
+
+
+void post_status(Simulation *sim, struct sockaddr_in *serv_addr)
+{
+ FILE *sockin, *sockout;
+ const char *status;
+ char path[80], line[80];
+
+ if (-1 == connect_to_portal(serv_addr, &sockin, &sockout)) {
+ return;
+ }
+
+ status = statusStr(sim->status);
+ sprintf(path, PORTAL_URL_ROOT "/simulations/%u/status/", sim->id);
+ printf("POST %s status=%s\n", path, status);
+ fpostf(path, sockout, "status=%s", status);
+
+ skip_http_headers(sockin);
+ if (fgets(line, sizeof(line), sockin)) {
+ printf("response: %s\n", line);
+ }
+
+ fclose(sockin);
+ fclose(sockout);
+}
+
+
+void monitor_simulation(Simulation *sim, struct sockaddr_in *serv_addr)
+{
+ switch (sim->status) {
+ case SIM_STATUS_NEW:
+ break;
+ case SIM_STATUS_PENDING:
+ sim->status = SIM_STATUS_ACTIVE;
+ post_status(sim, serv_addr);
+ break;
+ case SIM_STATUS_ACTIVE:
+ sim->status = SIM_STATUS_DONE;
+ post_status(sim, serv_addr);
+ break;
+ case SIM_STATUS_DONE:
+ case SIM_STATUS_FAILED:
+ break;
+ }
+}
+
+
+/*------------------------------------------------------------------------
+ main processing loop
+ ------------------------------------------------------------------------*/
+
+
+void scan_simulation_list(FILE *sockin, Simulation **simList)
+{
+ unsigned int id; char status[32]; unsigned int nodes;
+ Simulation *sim;
+
+ skip_http_headers(sockin);
+ while (fscanf(sockin, "%u %31s %u\n", &id, status, &nodes) == 3) {
+ if (strcmp(status, "new") == 0) {
+ /* new simulation */
+ sim = (Simulation *)malloc(sizeof(Simulation));
+ sim->next = *simList;
+ sim->id = id;
+ sim->status = SIM_STATUS_NEW;
+ sim->nodes = nodes;
+ sim->globusJobId = NULL;
+ sim->gassServer = (pid_t)-1;
+ sim->sleepTimer = 0;
+ sim->stagnancy = 0;
+
+ *simList = sim;
+ }
+ }
+}
+
+
+void reap_children(Simulation *simList)
+{
+}
+
+
+void watch_for_simulations(struct sockaddr_in *serv_addr)
+{
+ FILE *sockin, *sockout;
+ Simulation *sim = NULL, *simList = NULL;
+ int ret;
+
+ while (1) {
+
+ reap_children(simList);
+
+ ret = connect_to_portal(serv_addr, &sockin, &sockout);
+ if (ret != -1) {
+ ret = get(PORTAL_URL_ROOT "/simulations/list.txt", sockout);
+ if (ret != -1) {
+ scan_simulation_list(sockin, &simList);
+ }
+ }
+
+ fclose(sockin);
+ fclose(sockout);
+
+ /* start new simulations */
+ for (sim = simList; sim && sim->status == SIM_STATUS_NEW; sim = sim->next) {
+ /*
+ * Tell the server we've processed this simulation. This
+ * also prevents us from creating duplicate Simulation
+ * records on the next iteration of this loop!
+ */
+ sim->status = SIM_STATUS_PENDING;
+ post_status(sim, serv_addr);
+
+ /* Wait two ticks before checking this Simulation again. */
+ sim->stagnancy = 2;
+ }
+
+ /* monitor old simulations */
+ for (sim = simList; sim; sim = sim->next) {
+ if (sim->sleepTimer > sim->stagnancy) {
+ monitor_simulation(sim, serv_addr);
+ sim->sleepTimer = 0;
+ } else {
+ ++sim->sleepTimer;
+ }
+ }
+
+ sleep(SLEEP_INTERVAL);
+ }
+
+ return;
+}
+
+
+/*------------------------------------------------------------------------
+ main program
+ ------------------------------------------------------------------------*/
+
+
+int main()
+{
+ struct hostent *he;
+ struct sockaddr_in serv_addr;
+
+ he = gethostbyname(PORTAL_HOSTNAME);
+ if (!he) {
+ herror("gethostbyname");
+ }
+ memset(&serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(PORTAL_HTTP_PORT);
+ serv_addr.sin_addr = **((struct in_addr **)he->h_addr_list);
+
+ watch_for_simulations(&serv_addr);
+ return 0;
+}
+
+
+/* end of file */
Added: cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt
===================================================================
--- cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt 2007-11-29 00:15:05 UTC (rev 8349)
+++ cs/portal/trunk/hello/hello/templates/hello/simulation_list.txt 2007-11-29 01:27:38 UTC (rev 8350)
@@ -0,0 +1,2 @@
+{% for object in object_list %}
+{{ object.id }} {{ object.status }} {{ object.nodes }}{% endfor %}
Modified: cs/portal/trunk/hello/hello/urls.py
===================================================================
--- cs/portal/trunk/hello/hello/urls.py 2007-11-29 00:15:05 UTC (rev 8349)
+++ cs/portal/trunk/hello/hello/urls.py 2007-11-29 01:27:38 UTC (rev 8350)
@@ -39,6 +39,11 @@
allow_empty = True,
template_name = APP_LABEL + '/simulation_list.py',
mimetype = 'text/plain')),
+ (r'^simulations/list.txt$', 'django.views.generic.list_detail.object_list', dict(queryset = models.Simulation.objects.all(),
+ allow_empty = True,
+ template_name = APP_LABEL + '/simulation_list.txt',
+ mimetype = 'text/plain')),
+
# input files
(r'^simulations/(?P<object_id>\d+)/parameters.txt$', view('parameters_txt')),
More information about the cig-commits
mailing list