From 7183348ec9e9fb81522b4b9487b51c40d37dcb6c Mon Sep 17 00:00:00 2001 From: "Hansi, dl9rdz" Date: Fri, 16 Aug 2024 00:02:21 +0000 Subject: [PATCH] SH connector reorg -> make it asynchronous --- RX_FSK/RX_FSK.ino | 510 +---------------------------------- RX_FSK/src/ShFreqImport.cpp | 13 +- RX_FSK/src/ShFreqImport.h | 4 +- RX_FSK/src/conn-sondehub.cpp | 395 +++++++++++++++++---------- RX_FSK/src/conn-sondehub.h | 1 + 5 files changed, 272 insertions(+), 651 deletions(-) diff --git a/RX_FSK/RX_FSK.ino b/RX_FSK/RX_FSK.ino index 930a17d..93f85e7 100644 --- a/RX_FSK/RX_FSK.ino +++ b/RX_FSK/RX_FSK.ino @@ -2205,6 +2205,9 @@ void loopDecoder() { #if FEATURE_APRS connAPRS.updateStation( NULL ); #endif +#if FEATURE_SONDEHUB + connSondehub.updateStation( NULL ); +#endif #if FEATURE_SDCARD connSDCard.updateStation( NULL ); #endif @@ -3042,510 +3045,3 @@ void loop() { } -#if 0 -// removed here, now in connSondehub - -// Sondehub v2 DB related codes -/* - Update station data to the sondehub v2 DB -*/ -/* which_pos: 0=none, 1=fixed, 2=gps */ -void sondehub_station_update(WiFiClient * client, struct st_sondehub * conf) { -#define STATION_DATA_LEN 300 - char data[STATION_DATA_LEN]; - char *w; - - // If there is no connection to some WiFi AP, we cannot upload any data at all.... - if ( wifi_state != WIFI_CONNECTED ) return; - - unsigned long time_now = millis(); - // time_delta will be correct, even if time_now overflows - unsigned long time_delta = time_now - time_last_update; - - int chase = conf->chase; - // automatically decided if CHASE or FIXED mode is used (for config AUTO) - if (chase == SH_LOC_AUTO) { - if (posInfo.chase) chase = SH_LOC_CHASE; else chase = SH_LOC_FIXED; - } - - // Use 30sec update time in chase mode, 60 min in station mode. - unsigned long update_time = (chase == SH_LOC_CHASE) ? SONDEHUB_MOBILE_STATION_UPDATE_TIME : SONDEHUB_STATION_UPDATE_TIME; - - // If it is not yet time to send another update. do nothing.... - if ( (time_delta <= update_time) ) return; - - Serial.println("sondehub_station_update()"); - time_last_update = time_now; - - if (!client->connected()) { - if (!client->connect(conf->host, 80)) { - Serial.println("Connection FAILED"); - return; - } - } - - w = data; - // not necessary... memset(w, 0, STATION_DATA_LEN); - - sprintf(w, - "{" - "\"software_name\": \"%s\"," - "\"software_version\": \"%s\"," - "\"uploader_callsign\": \"%s\",", - version_name, version_id, conf->callsign); - w += strlen(w); - - // Only send email if provided - if (strlen(conf->email) != 0) { - sprintf(w, "\"uploader_contact_email\": \"%s\",", conf->email); - w += strlen(w); - } - - // Only send antenna if provided - if (strlen(conf->antenna) != 0) { - sprintf(w, "\"uploader_antenna\": \"%s\",", conf->antenna); - w += strlen(w); - } - - // We send GPS position: (a) in CHASE mode, (b) in AUTO mode if no fixed location has been specified in config - if (chase == SH_LOC_CHASE) { - if (gpsPos.valid) { - sprintf(w, - "\"uploader_position\": [%.6f,%.6f,%d]," - "\"mobile\": true", - gpsPos.lat, gpsPos.lon, gpsPos.alt); - } else { - sprintf(w, "\"uploader_position\": [null,null,null]"); - } - w += strlen(w); - } - // Otherweise, in FIXED mode we send the fixed position from config (if specified) - else if (chase == SH_LOC_FIXED) { - if ((!isnan(sonde.config.rxlat)) && (!isnan(sonde.config.rxlon))) { - if (isnan(sonde.config.rxalt)) - sprintf(w, "\"uploader_position\": [%.6f,%.6f,null]", sonde.config.rxlat, sonde.config.rxlon); - else - sprintf(w, "\"uploader_position\": [%.6f,%.6f,%d]", sonde.config.rxlat, sonde.config.rxlon, (int)sonde.config.rxalt); - } else { - sprintf(w, "\"uploader_position\": [null,null,null]"); - } - w += strlen(w); - } else { - sprintf(w, "\"uploader_position\": [null,null,null]"); - w += strlen(w); - } - - // otherwise (in SH_LOC_NONE mode) we dont include any position info - sprintf(w, "}"); - - client->println("PUT /listeners HTTP/1.1"); - client->print("Host: "); - client->println(conf->host); - client->println("accept: text/plain"); - client->println("Content-Type: application/json"); - client->print("Content-Length: "); - client->println(strlen(data)); - client->println(); - client->println(data); - Serial.println(strlen(data)); - Serial.println(data); - Serial.println("Waiting for response"); - // TODO: better do this asyncrhonously - // At least, do this safely. See Notes-on-Using-WiFiClient.txt for details - // If any of the client->print failed before (remote end closed connection), - // then calling client->read will cause a LoadProhibited exception - if (client->connected()) { - String response = client->readString(); - Serial.println(response); - Serial.println("Response done..."); - } else { - Serial.println("SH client connection closed\n"); - } - //client->stop(); -} - -/* - Update sonde data to the sondehub v2 DB -*/ -enum SHState { SH_DISCONNECTED, SH_CONNECTING, SH_CONN_IDLE, SH_CONN_APPENDING, SH_CONN_WAITACK }; - -SHState shState = SH_DISCONNECTED; -time_t shStart = 0; - - - -void sondehub_reply_handler(WiFiClient * client) { - // sondehub handler for tasks to be done even if no data is to be sent: - // process response messages from sondehub - // request frequency list (if active) -#define MSG_SIZE 1000 - char rs_msg[MSG_SIZE]; - - if (shImport == 1) { // we are waiting for a reply to a sondehub frequency import request - // while we are waiting, we do nothing else with sondehub... - int res = ShFreqImport::shImportHandleReply(&shclient); - Serial.printf("ret: %d\n", res); - // res==0 means more data is expected, res==1 means complete reply received (or error) - if (res == 1) { - shImport = 2; // finished - shImportInterval = sonde.config.sondehub.fiinterval * 60; - } - } - else { - // any reply here belongs to normal telemetry upload, lets just print it. - // and wait for a valid HTTP response - int cnt = 0; - while (client->available() > 0) { - // data is available from remote server, process it... - // readBytesUntil may wait for up to 1 second if enough data is not available... - // int cnt = client->readBytesUntil('\n', rs_msg, MSG_SIZE - 1); - int c = client->read(); - if (c < 0) break; // should never happen in available() returned >0 right before.... - rs_msg[cnt++] = c; - if (c == '\n') { - rs_msg[cnt] = 0; - Serial.println(rs_msg); - // If something that looks like a valid HTTP response is received, we are ready to send the next data item - if (shState == SH_CONN_WAITACK && cnt > 11 && strncmp(rs_msg, "HTTP/1", 6) == 0) { - shState = SH_CONN_IDLE; - } - cnt = 0; - } - if (cnt >= MSG_SIZE - 1) { - cnt = 0; - Serial.println("(overlong line from network, ignoring)"); - } - } - if (cnt > 0) { - rs_msg[cnt + 1] = 0; - Serial.println(rs_msg); - } - } - // send import requests if needed - if (sonde.config.sondehub.fiactive) { - if (shImport == 2) { - Serial.printf("next sondehub frequncy import in %d seconds\n", shImportInterval); - shImportInterval --; - if (shImportInterval <= 0) { - shImport = 0; - } - } - else if (shImport == 0) { - if (shState == SH_CONN_APPENDING || shState == SH_CONN_WAITACK) - Serial.printf("Time to request next sondehub import.... but still busy with upload request"); - else - sondehub_send_fimport(&shclient); - } - } - - // also handle periodic station updates here... - // interval check moved to sondehub_station_update to avoid having to calculate distance in auto mode twice - if (sonde.config.sondehub.active) { - if (shState == SH_CONN_IDLE || shState == SH_DISCONNECTED ) { - // (do not set station update while a telemetry report is being sent - sondehub_station_update(&shclient, &sonde.config.sondehub); - } - } -} - -void sondehub_send_fimport(WiFiClient * client) { - if (shState == SH_CONN_APPENDING || shState == SH_CONN_WAITACK) { - // Currently busy with SondeHub data upload - // So do nothing here. - // sond_fimport will be re-sent later, when shState becomes SH_CONN_IDLE - return; - } - // It's time to run, so check prerequisites - float lat = sonde.config.rxlat, lon = sonde.config.rxlon; - if (gpsPos.valid) { - lat = gpsPos.lat; - lon = gpsPos.lon; - } - - int maxdist = sonde.config.sondehub.fimaxdist; // km - int maxage = sonde.config.sondehub.fimaxage * 60; // fimaxage is hours, shImportSendRequest uses minutes - int fiinterval = sonde.config.sondehub.fiinterval; - Serial.printf("shimp : %f %f %d %d %d\n", lat, lon, maxdist, maxage, shImportInterval); - if ( !isnan(lat) && !isnan(lon) && maxdist > 0 && maxage > 0 && fiinterval > 0 ) { - int res = ShFreqImport::shImportSendRequest(&shclient, lat, lon, maxdist, maxage); - if (res == 0) shImport = 1; // Request OK: wait for response - else shImport = 2; // Request failed: wait interval, then retry - } -} - -// in hours.... max allowed diff UTC <-> sonde time -#define SONDEHUB_TIME_THRESHOLD (3) -void sondehub_send_data(WiFiClient * client, SondeInfo * s, struct st_sondehub * conf) { - Serial.println("sondehub_send_data()"); - Serial.printf("shState = %d\n", shState); - - // max age of data in JSON request (in seconds) -#define SONDEHUB_MAXAGE 15 - - char rs_msg[MSG_SIZE]; - char *w; - struct tm ts; - // config setting M10 and M20 will both decode both types, so use the real type that was decoded - uint8_t realtype = sonde.realType(s); - - // For DFM, s->d.time is data from subframe DAT8 (gps date/hh/mm), and sec is from DAT1 (gps sec/usec) - // For all others, sec should always be 0 and time the exact time in seconds - time_t t = s->d.time; - - int chase = conf->chase; - // automatically decided if CHASE or FIXED mode is used (for config AUTO) - if (chase == SH_LOC_AUTO) { - if (posInfo.chase) chase = SH_LOC_CHASE; else chase = SH_LOC_FIXED; - } - - - struct tm timeinfo; - time_t now; - time(&now); - gmtime_r(&now, &timeinfo); - if (timeinfo.tm_year <= (2016 - 1900)) { - Serial.println("Failed to obtain time"); - return; - } - - // Check if current sonde data is valid. If not, don't do anything.... - if (*s->d.ser == 0 || s->d.validID == 0 ) return; // Don't send anything without serial number - if (((int)s->d.lat == 0) && ((int)s->d.lon == 0)) return; // Sometimes these values are zeroes. Don't send those to the sondehub - if ((int)s->d.alt > 50000) return; // If alt is too high don't send to SondeHub - // M20 data does not include #sat information - if ( realtype != STYPE_M20 && (int)s->d.sats < 4) return; // If not enough sats don't send to SondeHub - - // If not connected to sondehub, try reconnecting. - // TODO: do this outside of main loop - if (!client->connected()) { - Serial.println("NO CONNECTION"); - shState = SH_DISCONNECTED; - if (!client->connect(conf->host, 80)) { - Serial.println("Connection FAILED"); - return; - } - client->Client::setTimeout(0); // does this work? - shState = SH_CONN_IDLE; - } - - if ( shState == SH_CONN_WAITACK ) { - Serial.println("Previous SH-frame not yet ack'ed, not sending new data"); - return; - } - - if ( abs(now - (time_t)s->d.time) > (3600 * SONDEHUB_TIME_THRESHOLD) ) { - Serial.printf("Sonde time %d too far from current UTC time %ld", s->d.time, now); - return; - } - - // DFM uses UTC. Most of the other radiosondes use GPS time - // SondeHub expect datetime to be the same time sytem as the sonde transmits as time stamp - if ( realtype == STYPE_RS41 || realtype == STYPE_RS92 || realtype == STYPE_M20 ) { - t += 18; // convert back to GPS time from UTC time +18s - } - - gmtime_r(&t, &ts); - - memset(rs_msg, 0, MSG_SIZE); - w = rs_msg; - - sprintf(w, - " {" - "\"software_name\": \"%s\"," - "\"software_version\": \"%s\"," - "\"uploader_callsign\": \"%s\"," - "\"time_received\": \"%04d-%02d-%02dT%02d:%02d:%02d.000Z\"," - "\"manufacturer\": \"%s\"," - "\"serial\": \"%s\"," - "\"datetime\": \"%04d-%02d-%02dT%02d:%02d:%02d.000Z\"," - "\"lat\": %.5f," - "\"lon\": %.5f," - "\"alt\": %.5f," - "\"frequency\": %.3f," - "\"vel_h\": %.5f," - "\"vel_v\": %.5f," - "\"heading\": %.5f," - "\"rssi\": %.1f," - "\"frame\": %d," - "\"type\": \"%s\",", - version_name, version_id, conf->callsign, - timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec, - manufacturer_string[realtype], s->d.ser, - ts.tm_year + 1900, ts.tm_mon + 1, ts.tm_mday, ts.tm_hour, ts.tm_min, ts.tm_sec, - (float)s->d.lat, (float)s->d.lon, (float)s->d.alt, (float)s->freq, (float)s->d.hs, (float)s->d.vs, - (float)s->d.dir, -((float)s->rssi / 2), s->d.vframe, sondeTypeStrSH[realtype] - ); - w += strlen(w); - - // Only send sats if not M20 - if (realtype != STYPE_M20) { - sprintf(w, "\"sats\": %d,", (int)s->d.sats); - w += strlen(w); - } - - /* if there is a subtype (DFM only) */ - if ( TYPE_IS_DFM(s->type) && s->d.subtype > 0 ) { - if ( (s->d.subtype & 0xF) != DFM_UNK) { - const char *t = dfmSubtypeLong[s->d.subtype & 0xF]; - sprintf(w, "\"subtype\": \"%s\",", t); - } - else { - sprintf(w, "\"subtype\": \"DFMx%X\",", s->d.subtype >> 4); // Unknown subtype - } - w += strlen(w); - } else if ( s->type == STYPE_RS41 ) { - char buf[11]; - if (RS41::getSubtype(buf, 11, s) == 0) { - sprintf(w, "\"subtype\": \"%s\",", buf); - w += strlen(w); - } - } - - // Only send temp if provided - if (!isnan(s->d.temperature)) { - sprintf(w, "\"temp\": %.1f,", s->d.temperature); - w += strlen(w); - } - - // Only send humidity if provided - if (!isnan(s->d.relativeHumidity)) { - sprintf(w, "\"humidity\": %.1f,", s->d.relativeHumidity); - w += strlen(w); - } - - // Only send pressure if provided - if (!isnan(s->d.pressure)) { - sprintf(w, "\"pressure\": %.2f,", s->d.pressure); - w += strlen(w); - } - - // Only send burst timer if RS41 and fresh within the last 51s - if ((realtype == STYPE_RS41) && (s->d.crefKT > 0) && (s->d.vframe - s->d.crefKT < 51)) { - sprintf(w, "\"burst_timer\": %d,", (int)s->d.countKT); - w += strlen(w); - } - - // Only send battery if provided - if (s->d.batteryVoltage > 0) { - sprintf(w, "\"batt\": %.2f,", s->d.batteryVoltage); - w += strlen(w); - } - - // Only send antenna if provided - if (strlen(conf->antenna) != 0) { - sprintf(w, "\"uploader_antenna\": \"%s\",", conf->antenna); - w += strlen(w); - } - - // We send GPS position: (a) in CHASE mode, (b) in AUTO mode if no fixed location has been specified in config - if (chase == SH_LOC_CHASE) { - if (gpsPos.valid) { - sprintf(w, "\"uploader_position\": [%.6f,%.6f,%d]", gpsPos.lat, gpsPos.lon, gpsPos.alt); - } else { - sprintf(w, "\"uploader_position\": [null,null,null]"); - } - w += strlen(w); - } - // Otherweise, in FIXED mode we send the fixed position from config (if specified) - else if (chase == SH_LOC_FIXED) { - if ((!isnan(sonde.config.rxlat)) && (!isnan(sonde.config.rxlon))) { - if (isnan(sonde.config.rxalt)) - sprintf(w, "\"uploader_position\": [%.6f,%.6f,null]", sonde.config.rxlat, sonde.config.rxlon); - else - sprintf(w, "\"uploader_position\": [%.6f,%.6f,%d]", sonde.config.rxlat, sonde.config.rxlon, (int)sonde.config.rxalt); - } else { - sprintf(w, "\"uploader_position\": [null,null,null]"); - } - w += strlen(w); - } else { - sprintf(w, "\"uploader_position\": [null,null,null]"); - w += strlen(w); - } - - // otherwise (in SH_LOC_NONE mode) we dont include any position info - sprintf(w, "}"); - - if (shState != SH_CONN_APPENDING) { - sondehub_send_header(client, s, conf, &timeinfo); - sondehub_send_next(client, s, conf, rs_msg, strlen(rs_msg), 1); - shState = SH_CONN_APPENDING; - shStart = now; - } else { - sondehub_send_next(client, s, conf, rs_msg, strlen(rs_msg), 0); - } - if (now - shStart > SONDEHUB_MAXAGE) { // after MAXAGE seconds - sondehub_send_last(client, s, conf); - shState = SH_CONN_WAITACK; - shStart = 0; - } - //client->println(rs_msg); - //Serial.println(rs_msg); - //String response = client->readString(); - //Serial.println(response); -} - -void sondehub_finish_data(WiFiClient * client, SondeInfo * s, struct st_sondehub * conf) { - // If there is an "old" pending collection of JSON data sets, send it even if no now data is received - if (shState == SH_CONN_APPENDING) { - time_t now; - time(&now); - if (now - shStart > SONDEHUB_MAXAGE + 3) { // after MAXAGE seconds - sondehub_send_last(client, s, conf); - shState = SH_CONN_WAITACK; - shStart = 0; - } - } -} - -static const char *DAYS[] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}; -static const char *MONTHS[] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Noc", "Dec"}; - -void sondehub_send_header(WiFiClient * client, SondeInfo * s, struct st_sondehub * conf, struct tm * now) { - Serial.print("PUT /sondes/telemetry HTTP/1.1\r\n" - "Host: "); - Serial.println(conf->host); - Serial.print("accept: text/plain\r\n" - "Content-Type: application/json\r\n" - "Transfer-Encoding: chunked\r\n"); - - client->print("PUT /sondes/telemetry HTTP/1.1\r\n" - "Host: "); - client->println(conf->host); - client->print("accept: text/plain\r\n" - "Content-Type: application/json\r\n" - "Transfer-Encoding: chunked\r\n"); - if (now) { - Serial.printf("Date: %s, %02d %s %04d %02d:%02d:%02d GMT\r\n", - DAYS[now->tm_wday], now->tm_mday, MONTHS[now->tm_mon], now->tm_year + 1900, - now->tm_hour, now->tm_min, now->tm_sec); - client->printf("Date: %s, %02d %s %04d %02d:%02d:%02d GMT\r\n", - DAYS[now->tm_wday], now->tm_mday, MONTHS[now->tm_mon], now->tm_year + 1900, - now->tm_hour, now->tm_min, now->tm_sec); - } - client->print("User-agent: "); - client->print(version_name); - client->print("/"); - client->println(version_id); - client->println(""); // another cr lf as indication of end of header -} -void sondehub_send_next(WiFiClient * client, SondeInfo * s, struct st_sondehub * conf, char *chunk, int chunklen, int first) { - // send next chunk of JSON request - client->printf("%x\r\n", chunklen + 1); - client->write(first ? "[" : ",", 1); - client->write(chunk, chunklen); - client->print("\r\n"); - - Serial.printf("%x\r\n", chunklen + 1); - Serial.write((const uint8_t *)(first ? "[" : ","), 1); - Serial.write((const uint8_t *)chunk, chunklen); - Serial.print("\r\n"); -} -void sondehub_send_last(WiFiClient * client, SondeInfo * s, struct st_sondehub * conf) { - // last chunk. just the closing "]" of the json request - client->printf("1\r\n]\r\n0\r\n\r\n"); - Serial.printf("1\r\n]\r\n0\r\n\r\n"); -} - - -// End of sondehub v2 related codes -#endif diff --git a/RX_FSK/src/ShFreqImport.cpp b/RX_FSK/src/ShFreqImport.cpp index 407b7aa..9608948 100644 --- a/RX_FSK/src/ShFreqImport.cpp +++ b/RX_FSK/src/ShFreqImport.cpp @@ -212,13 +212,16 @@ int ShFreqImport::handleChar(char c) { } // lat lon in deg, dist in km, time in minutes -int ShFreqImport::shImportSendRequest(WiFiClient *client, float lat, float lon, int dist, int time) { +int ShFreqImport::shImportSendRequest(int client, float lat, float lon, int dist, int time) { +#if 0 +// caller should call only if connected.. if(!client->connected()) { if(!client->connect(sonde.config.sondehub.host, 80)) { Serial.println("Connection FAILED"); return 1; } } +#endif Serial.println("Sending SondeHub import request"); char req[300]; snprintf(req, 200, "GET /sondes?lat=%f&lon=%f&distance=%d&last=%d HTTP/1.1\r\n" @@ -226,7 +229,7 @@ int ShFreqImport::shImportSendRequest(WiFiClient *client, float lat, float lon, "Accept: application/json\r\n" "Cache-Control: no-cache\r\n\r\n", lat, lon, dist*1000, time*60, sonde.config.sondehub.host); - client->print(req); + dprintf(client, req); Serial.print(req); importState = START; homelat = lat; @@ -237,11 +240,15 @@ int ShFreqImport::shImportSendRequest(WiFiClient *client, float lat, float lon, } // return 0 if more data should be read (later), 1 if finished (close connection...) -int ShFreqImport::shImportHandleReply(WiFiClient *client) { +int ShFreqImport::shImportHandleReply(int client) { +#if 0 +// TODO:::: + if(!client->connected()) return 1; while(client->available()) { int res = handleChar(client->read()); if(res) return res; } +#endif return 0; } diff --git a/RX_FSK/src/ShFreqImport.h b/RX_FSK/src/ShFreqImport.h index 7e6927a..84bc9c9 100644 --- a/RX_FSK/src/ShFreqImport.h +++ b/RX_FSK/src/ShFreqImport.h @@ -10,11 +10,11 @@ class ShFreqImport { public: // Fetch data from sondehub and populate qrg.txt with result // return: 0: ok; 1: failure - static int shImportSendRequest(WiFiClient *client, float lat, float lon, int dist, int time); + static int shImportSendRequest(int client, float lat, float lon, int dist, int time); // return 0: ok, need more data; 1: finished/failure, close connection // Asynchronous I/O. Handle data if available - static int shImportHandleReply(WiFiClient *client); + static int shImportHandleReply(int client); private: static int stringToStype(const char *type); diff --git a/RX_FSK/src/conn-sondehub.cpp b/RX_FSK/src/conn-sondehub.cpp index 645b55b..420f5b0 100644 --- a/RX_FSK/src/conn-sondehub.cpp +++ b/RX_FSK/src/conn-sondehub.cpp @@ -16,35 +16,55 @@ #include #include -#include +//#include extern const char *version_name; extern const char *version_id; #define SONDEHUB_STATION_UPDATE_TIME (60*60*1000) // 60 min #define SONDEHUB_MOBILE_STATION_UPDATE_TIME (30*1000) // 30 sec -WiFiClient shclient; // Sondehub v2 + +int shclient; // Sondehub v2 +ip_addr_t shclient_ipaddr; + int shImportInterval = 0; char shImport = 0; unsigned long time_last_update = 0; -enum SHState { SH_DISCONNECTED, SH_CONNECTING, SH_CONN_IDLE, SH_CONN_APPENDING, SH_CONN_WAITACK }; +enum SHState { SH_DISCONNECTED, SH_DNSLOOKUP, SH_DNSRESOLVED, SH_CONNECTING, SH_CONN_IDLE, SH_CONN_APPENDING, SH_CONN_WAITACK, SH_CONN_WAITIMPORTRES }; -SHState shState = SH_DISCONNECTED; +SHState shclient_state = SH_DISCONNECTED; time_t shStart = 0; #define MSG_SIZE 1000 static char rs_msg[MSG_SIZE]; +int rs_msg_len = 0; static String response; +static const char *state2str(SHState state) { + switch(state) { + case SH_DISCONNECTED: return "Disconnected"; + case SH_DNSLOOKUP: return "DNS lookup"; + case SH_DNSRESOLVED: return "DNS resolved"; + case SH_CONNECTING: return "Connecting"; + case SH_CONN_IDLE: return "Connected: Idle"; + case SH_CONN_APPENDING: return "Connected: Sending data"; + case SH_CONN_WAITACK: return "Connected: Waiting for ACK from server"; + case SH_CONN_WAITIMPORTRES: return "Connected: Waiting for import reply"; + default: return "??"; + } +} + + void ConnSondehub::init() { } void ConnSondehub::netsetup() { if (sonde.config.sondehub.active && wifi_state != WIFI_APMODE) { - time_last_update = millis() + 1000; /* force sending update */ - sondehub_station_update(); + // start connecting... + sondehub_client_fsm(); + time_last_update = 0; /* force sending update */ // SH import: initial refresh on connect, even if configured interval is longer shImportInterval = 5; // refresh now in 5 seconds @@ -58,7 +78,10 @@ void ConnSondehub::netsetup() { // each second, if good decode: sondehub_send_data // each second, if no good decode: sondehub_finish_data void ConnSondehub::updateSonde( SondeInfo *si ) { - sondehub_reply_handler(); + Serial.println("SH: updateSonde called"); + sondehub_client_fsm(); + + sondehub_reply_handler(); // TODO remove, done by fsm?? if(si==NULL) { sondehub_finish_data(); } else { @@ -68,14 +91,168 @@ void ConnSondehub::updateSonde( SondeInfo *si ) { void ConnSondehub::updateStation( PosInfo *pi ) { - // TODO: station_update should be here and not in netsetup() - // Currently, interlnal reply_handler does this, using gpsInfo global variable instead of this pi + Serial.println("SH: updateStation called"); + sondehub_client_fsm(); + // Currently, internal reply_handler uses gpsInfo global variable instead of this pi + sondehub_station_update(); +} + +static void _sh_dns_found(const char * name, const ip_addr_t *ipaddr, void * /*arg*/) { + if (ipaddr) { + shclient_ipaddr = *ipaddr; + shclient_state = SH_DNSRESOLVED; // DNS lookup success + } else { + memset(&shclient_ipaddr, 0, sizeof(shclient_ipaddr)); + shclient_state = SH_DISCONNECTED; // DNS lookup failed + // TODO: set "reply messge" to "DNS lookup failed" + } +} + +// Sondehub client asynchronous FSM... +void ConnSondehub::sondehub_client_fsm() { + fd_set fdset, fdeset; + FD_ZERO(&fdset); + FD_SET(shclient, &fdset); + FD_ZERO(&fdeset); + FD_SET(shclient, &fdeset); + struct timeval selto = {0}; + + Serial.printf("SH_FSM in state %d (%s)\n", shclient_state, state2str(shclient_state)); + + switch(shclient_state) { + case SH_DISCONNECTED: + { + // We are disconnected. Try to connect, starting with a DNS lookup + err_t res = dns_gethostbyname( sonde.config.sondehub.host, &shclient_ipaddr, _sh_dns_found, NULL ); + if(res == ERR_OK) { // returns immediately if host is IP or in cache + shclient_state = SH_DNSRESOLVED; + // fall through to next switch case + } else if(res == ERR_INPROGRESS) { + shclient_state = SH_DNSLOOKUP; + break; + } else { + shclient_state = SH_DISCONNECTED; + break; + } + } + case SH_DNSRESOLVED: + { + // We have got the IP address, start the connection (asynchronously) + shclient = socket(AF_INET, SOCK_STREAM, 0); + int flags = fcntl(shclient, F_GETFL); + if (fcntl(shclient, F_SETFL, flags | O_NONBLOCK) == -1) { + Serial.println("Setting O_NONBLOCK failed"); + } + + struct sockaddr_in sock_info; + memset(&sock_info, 0, sizeof(struct sockaddr_in)); + sock_info.sin_family = AF_INET; + sock_info.sin_addr.s_addr = shclient_ipaddr.u_addr.ip4.addr; + sock_info.sin_port = htons( 80 ); + err_t res = connect(shclient, (struct sockaddr *)&sock_info, sizeof(sock_info)); + if(res) { + if (errno == EINPROGRESS) { // Should be the usual case, go to connecting state + shclient_state = SH_CONNECTING; + } else { + close(shclient); + shclient_state = SH_DISCONNECTED; + } + } else { + shclient_state = SH_CONN_IDLE; + // ok, ready to send data... + } + } + break; + + case SH_CONNECTING: + { + // Poll to see if we are now connected +// Poll to see if we are now connected + int res = select(shclient+1, NULL, &fdset, &fdeset, &selto); + if(res<0) { + Serial.println("SH_CONNECTING: select error"); + goto error; + } else if (res==0) { // still pending + break; + } + // Socket has become ready (or something went wrong, check for error first) + + int sockerr; + socklen_t len = (socklen_t)sizeof(int); + if (getsockopt(shclient, SOL_SOCKET, SO_ERROR, (void*)(&sockerr), &len) < 0) { + goto error; + } + Serial.printf("select returing %d. isset:%d iseset:%d sockerr:%d\n", res, FD_ISSET(shclient, &fdset), FD_ISSET(shclient, &fdeset), sockerr); + if(sockerr) { + Serial.printf("SH connect error: %s\n", strerror(sockerr)); + goto error; + } + shclient_state = SH_CONN_IDLE; + // ok, ready to send data... + } + break; + + case SH_CONN_IDLE: + case SH_CONN_WAITACK: + { + // In CONN_WAITACK: + // If data starts with HTTP/1 this is the expected response, move to state CONN_IDLE + // noise tolerant - should not be needed: + // if the data contains HTTP/1 copy that to the start of the buffer, ignore anything up to that point + // if not find the last \0 and append next response after the part afterwards + + int res = select(shclient+1, &fdset, NULL, NULL, &selto); + if(res<0) { + Serial.println("SH_CONN_IDLE: select error"); + goto error; + } else if (res==0) { // no data + break; + } + // Read data + char buf[512+1]; + res = read(shclient, buf, 512); + if(res<=0) { + close(shclient); + shclient_state = SH_DISCONNECTED; + } else { + // Copy to reponse + for(int i=0; ihost, 80)) { - Serial.println("Connection FAILED"); - return; - } - } - shState = SH_CONN_IDLE; - w = data; // not necessary... memset(w, 0, STATION_DATA_LEN); @@ -170,30 +344,25 @@ void ConnSondehub::sondehub_station_update() { // otherwise (in SH_LOC_NONE mode) we dont include any position info sprintf(w, "}"); - shclient.println("PUT /listeners HTTP/1.1"); - shclient.print("Host: "); - shclient.println(conf->host); - shclient.println("accept: text/plain"); - shclient.println("Content-Type: application/json"); - shclient.print("Content-Length: "); - shclient.println(strlen(data)); - shclient.println(); - shclient.println(data); - Serial.println(strlen(data)); - Serial.println(data); + dprintf( shclient, "PUT /listeners HTTP/1.1\r\n" + "Host: %s\r\n" + "accept: text/plain\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %d\r\n\r\n%s", + conf->host, strlen(data), data); + + Serial.printf("PUT /listeners HTTP/1.1\n" + "Host: %s\n" + "accept: text/plain\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n%s", + conf->host, strlen(data), data); + Serial.println("Waiting for response"); - // TODO: better do this asynchronously - // At least, do this safely. See Notes-on-Using-WiFiClient.txt for details - // If any of the shclient.print failed before (remote end closed connection), - // then calling client->read will cause a LoadProhibited exception - if (shclient.connected()) { - response = shclient.readString(); - Serial.println(response); - Serial.println("Response done..."); - } else { - Serial.println("SH client connection closed\n"); - } - //client->stop(); + // Now we do this asychronously + shclient_state = SH_CONN_WAITACK; + + sondehub_client_fsm(); } /* @@ -205,46 +374,20 @@ void ConnSondehub::sondehub_reply_handler() { // process response messages from sondehub // request frequency list (if active) - if (shImport == 1) { // we are waiting for a reply to a sondehub frequency import request + if( shclient_state == SH_CONN_WAITIMPORTRES ) { // we are waiting for a reply to a sondehub frequency import request // while we are waiting, we do nothing else with sondehub... - int res = ShFreqImport::shImportHandleReply(&shclient); + int res = ShFreqImport::shImportHandleReply(shclient); Serial.printf("ret: %d\n", res); // res==0 means more data is expected, res==1 means complete reply received (or error) if (res == 1) { - shImport = 2; // finished + shclient_state = SH_CONN_IDLE; + // shImport = 2; // finished shImportInterval = sonde.config.sondehub.fiinterval * 60; } + return; } - else { - // any reply here belongs to normal telemetry upload, lets just print it. - // and wait for a valid HTTP response - int cnt = 0; - while (shclient.available() > 0) { - // data is available from remote server, process it... - // readBytesUntil may wait for up to 1 second if enough data is not available... - // int cnt = shclient.readBytesUntil('\n', rs_msg, MSG_SIZE - 1); - int c = shclient.read(); - if (c < 0) break; // should never happen in available() returned >0 right before.... - rs_msg[cnt++] = c; - if (c == '\n') { - rs_msg[cnt] = 0; - Serial.println(rs_msg); - // If something that looks like a valid HTTP response is received, we are ready to send the next data item - if (shState == SH_CONN_WAITACK && cnt > 11 && strncmp(rs_msg, "HTTP/1", 6) == 0) { - shState = SH_CONN_IDLE; - } - cnt = 0; - } - if (cnt >= MSG_SIZE - 1) { - cnt = 0; - Serial.println("(overlong line from network, ignoring)"); - } - } - if (cnt > 0) { - rs_msg[cnt + 1] = 0; - Serial.println(rs_msg); - } - } + +#if 0 // send import requests if needed if (sonde.config.sondehub.fiactive) { if (shImport == 2) { @@ -261,7 +404,9 @@ void ConnSondehub::sondehub_reply_handler() { sondehub_send_fimport(); } } +#endif +#if 0 // also handle periodic station updates here... // interval check moved to sondehub_station_update to avoid having to calculate distance in auto mode twice if (sonde.config.sondehub.active) { @@ -270,9 +415,11 @@ void ConnSondehub::sondehub_reply_handler() { sondehub_station_update(); } } +#endif } void ConnSondehub::sondehub_send_fimport() { +#if 0 if (shState == SH_CONN_APPENDING || shState == SH_CONN_WAITACK) { // Currently busy with SondeHub data upload // So do nothing here. @@ -295,6 +442,7 @@ void ConnSondehub::sondehub_send_fimport() { if (res == 0) shImport = 1; // Request OK: wait for response else shImport = 2; // Request failed: wait interval, then retry } +#endif } @@ -304,7 +452,14 @@ void ConnSondehub::sondehub_send_data(SondeInfo * s) { struct st_sondehub *conf = &sonde.config.sondehub; Serial.println("sondehub_send_data()"); - Serial.printf("shState = %d\n", shState); + Serial.printf("shclient_state = %d\n", shclient_state); + + sondehub_client_fsm(); + // Only send data when in idle or appending state.... + if(shclient_state != SH_CONN_IDLE && shclient_state != SH_CONN_APPENDING) { + Serial.println("Not in right state for sending next request..."); + return; + } // max age of data in JSON request (in seconds) #define SONDEHUB_MAXAGE 15 @@ -342,23 +497,6 @@ void ConnSondehub::sondehub_send_data(SondeInfo * s) { // M20 data does not include #sat information if ( realtype != STYPE_M20 && (int)s->d.sats < 4) return; // If not enough sats don't send to SondeHub - // If not connected to sondehub, try reconnecting. - // TODO: do this outside of main loop - if (!shclient.connected()) { - Serial.println("NO CONNECTION"); - shState = SH_DISCONNECTED; - if (!shclient.connect(conf->host, 80)) { - Serial.println("Connection FAILED"); - return; - } - shclient.Client::setTimeout(0); // does this work? - shState = SH_CONN_IDLE; - } - - if ( shState == SH_CONN_WAITACK ) { - Serial.println("Previous SH-frame not yet ack'ed, not sending new data"); - return; - } if ( abs(now - (time_t)s->d.time) > (3600 * SONDEHUB_TIME_THRESHOLD) ) { Serial.printf("Sonde time %d too far from current UTC time %ld", s->d.time, now); return; @@ -491,33 +629,29 @@ void ConnSondehub::sondehub_send_data(SondeInfo * s) { // otherwise (in SH_LOC_NONE mode) we dont include any position info sprintf(w, "}"); - if (shState != SH_CONN_APPENDING) { + if (shclient_state != SH_CONN_APPENDING) { sondehub_send_header(s, &timeinfo); sondehub_send_next(s, rs_msg, strlen(rs_msg), 1); - shState = SH_CONN_APPENDING; + shclient_state = SH_CONN_APPENDING; shStart = now; } else { sondehub_send_next(s, rs_msg, strlen(rs_msg), 0); } if (now - shStart > SONDEHUB_MAXAGE) { // after MAXAGE seconds sondehub_send_last(); - shState = SH_CONN_WAITACK; + shclient_state = SH_CONN_WAITACK; shStart = 0; } - //client->println(rs_msg); - //Serial.println(rs_msg); - //String response = client->readString(); - //Serial.println(response); } void ConnSondehub::sondehub_finish_data() { // If there is an "old" pending collection of JSON data sets, send it even if no now data is received - if (shState == SH_CONN_APPENDING) { + if (shclient_state == SH_CONN_APPENDING) { time_t now; time(&now); if (now - shStart > SONDEHUB_MAXAGE + 3) { // after MAXAGE seconds sondehub_send_last(); - shState = SH_CONN_WAITACK; + shclient_state = SH_CONN_WAITACK; shStart = 0; } } @@ -535,32 +669,27 @@ void ConnSondehub::sondehub_send_header(SondeInfo * s, struct tm * now) { "Content-Type: application/json\r\n" "Transfer-Encoding: chunked\r\n"); - shclient.print("PUT /sondes/telemetry HTTP/1.1\r\n" - "Host: "); - shclient.println(conf->host); - shclient.print("accept: text/plain\r\n" + dprintf(shclient, "PUT /sondes/telemetry HTTP/1.1\r\n" + "Host: %s\n" + "accept: text/plain\r\n" "Content-Type: application/json\r\n" - "Transfer-Encoding: chunked\r\n"); + "Transfer-Encoding: chunked\r\n", conf->host); if (now) { Serial.printf("Date: %s, %02d %s %04d %02d:%02d:%02d GMT\r\n", DAYS[now->tm_wday], now->tm_mday, MONTHS[now->tm_mon], now->tm_year + 1900, now->tm_hour, now->tm_min, now->tm_sec); - shclient.printf("Date: %s, %02d %s %04d %02d:%02d:%02d GMT\r\n", + dprintf(shclient, "Date: %s, %02d %s %04d %02d:%02d:%02d GMT\r\n", DAYS[now->tm_wday], now->tm_mday, MONTHS[now->tm_mon], now->tm_year + 1900, now->tm_hour, now->tm_min, now->tm_sec); } - shclient.print("User-agent: "); - shclient.print(version_name); - shclient.print("/"); - shclient.println(version_id); - shclient.println(""); // another cr lf as indication of end of header + dprintf(shclient, "User-agent: %s/%s\n\n", version_name, version_id); + // another cr lf as indication of end of header } void ConnSondehub::sondehub_send_next(SondeInfo * s, char *chunk, int chunklen, int first) { // send next chunk of JSON request - shclient.printf("%x\r\n", chunklen + 1); - shclient.write(first ? "[" : ",", 1); - shclient.write(chunk, chunklen); - shclient.print("\r\n"); + dprintf(shclient, "%x\r\n%c", chunklen + 1, first ? '[' : ','); + write(shclient, chunk, chunklen); + write(shclient, "\r\n", 2); Serial.printf("%x\r\n", chunklen + 1); Serial.write((const uint8_t *)(first ? "[" : ","), 1); @@ -569,33 +698,21 @@ void ConnSondehub::sondehub_send_next(SondeInfo * s, char *chunk, int chunklen, } void ConnSondehub::sondehub_send_last() { // last chunk. just the closing "]" of the json request - shclient.printf("1\r\n]\r\n0\r\n\r\n"); + dprintf(shclient, "1\r\n]\r\n0\r\n\r\n"); Serial.printf("1\r\n]\r\n0\r\n\r\n"); } -static const char *state2str(SHState state) { - switch(state) { - case SH_DISCONNECTED: return "Disconnected"; - case SH_CONNECTING: return "Connecting"; - case SH_CONN_IDLE: return "Connected: Idle"; - case SH_CONN_APPENDING: return "Connected: Sending data"; - case SH_CONN_WAITACK: return "Connected: Waiting for ACK from server"; - default: return "??"; - } -} String ConnSondehub::getStatus() { char info[1200]; time_t now; time(&now); if(shStart==0) now=-1; - snprintf(info, 1200, "State: %s. Last upload start: %ld s ago
Last reply: %s", - state2str(shState), (uint32_t)(now-shStart), rs_msg); - if(response) { - strlcat(info, "
Response: ", 1200); - int n = strlen(info); - escapeJson(info+n, response.c_str(), 1200-n); - } + snprintf(info, 1200, "State: %s. Last upload start: %ld s ago
Last reply: ", + state2str(shclient_state), (uint32_t)(now-shStart)); + int n = strlen(info); + escapeJson(info+n, rs_msg, 1200-n); + return String(info); } diff --git a/RX_FSK/src/conn-sondehub.h b/RX_FSK/src/conn-sondehub.h index 416a858..917c577 100644 --- a/RX_FSK/src/conn-sondehub.h +++ b/RX_FSK/src/conn-sondehub.h @@ -27,6 +27,7 @@ public: String getName(); private: + void sondehub_client_fsm(); void sondehub_station_update(); void sondehub_reply_handler(); void sondehub_send_fimport();