Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ COPY --from=jre-build /javaruntime $JAVA_HOME
COPY --from=builder-staging /staging/ /

RUN true && \
mv /etc/iofog-agent/config_new.xml /etc/iofog-agent/config.xml && \
mv /etc/iofog-agent/config-development_new.xml /etc/iofog-agent/config-development.xml && \
mv /etc/iofog-agent/config-production_new.xml /etc/iofog-agent/config-production.xml && \
mv /etc/iofog-agent/config-switcher_new.xml /etc/iofog-agent/config-switcher.xml && \
mv /etc/iofog-agent/config_new.yaml /etc/iofog-agent/config.yaml && \
mv /etc/iofog-agent/cert_new.crt /etc/iofog-agent/cert.crt && \
# </dev/urandom tr -dc A-Za-z0-9 | head -c32 > /etc/iofog-agent/local-api && \
mkdir -p /var/backups/iofog-agent && \
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {

allprojects {
group = 'org.eclipse'
version = '3.5.6'
version = '3.6.0'
}

subprojects {
Expand Down
1 change: 1 addition & 0 deletions iofog-agent-daemon/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
implementation 'com.google.crypto.tink:tink:1.9.0'
implementation 'org.bouncycastle:bcprov-jdk18on:1.80'
implementation 'org.msgpack:msgpack-core:0.9.8'
implementation 'org.yaml:snakeyaml:2.2'
testImplementation 'org.mockito:mockito-core:5.4.0'
testImplementation 'org.mockito:mockito-junit-jupiter:5.4.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public void write(int b) {
}

public static void main(String[] args) throws ParseException {
// Set LogManager system property FIRST, before any other code
System.setProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager");

try {
Configuration.load();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ public String perform(String[] args) {

try {

HashMap<String, String> oldValuesMap = getOldNodeValuesForParameters(config.keySet(),
Configuration.getCurrentConfig());
HashMap<String, String> oldValuesMap = Configuration.getOldNodeValuesForParameters(config.keySet());
HashMap<String, String> errorMap = setConfig(config, false);

for (Map.Entry<String, String> e : errorMap.entrySet())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public enum CommandLineConfigParam {
GPS_SCAN_FREQUENCY("60", "gpsf", "gps_scan_freq", "gpsScanFrequency"),
GPS_COORDINATES ("", "", "gps_coordinates", "gpscoordinates"),
POST_DIAGNOSTICS_FREQ ("10", "df", "post_diagnostics_freq", "postdiagnosticsfreq"),
FOG_TYPE ("auto", "ft", "fog_type", ""),
ARCH ("auto", "ft", "arch", ""),
SECURE_MODE ("off", "sec", "secure_mode", ""),
ROUTER_HOST ("", "", "router_host", "routerHost"),
ROUTER_PORT ("0", "", "router_port", "routerPort"),
Expand All @@ -64,6 +64,7 @@ public enum CommandLineConfigParam {
READY_TO_UPGRADE_SCAN_FREQUENCY ("24", "uf", "upgrade_scan_frequency", "readyToUpgradeScanFrequency"),
DEV_MODE ("off", "dev", "dev_mode", ""),
TIME_ZONE("", "tz", "time_zone", "timeZone"),
NAMESPACE("default", "", "namespace", "namespace"),
CA_CERT("", "", "ca_cert", "caCert"),
TLS_CERT("", "", "tls_cert", "tlsCert"),
TLS_KEY("", "", "tls_key", "tlsKey"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class FieldAgent implements IOFogModule {
private ScheduledFuture<?> futureTask;
private EdgeResourceManager edgeResourceManager;
private VolumeMountManager volumeMountManager;
private LogSessionManager logSessionManager;

private final Map<String, String> activeExecSessions = new ConcurrentHashMap<>();
private final Map<String, ExecSessionCallback> execCallbacks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -499,6 +500,16 @@ private final Future<Boolean> processChanges(JsonObject changes) {
resetChanges = false;
}
}
if (changes.getBoolean("microserviceLogs", false) || changes.getBoolean("fogLogs", false)) {
logDebug("Processing log sessions changes - microserviceLogs: " + changes.getBoolean("microserviceLogs", false) +
", fogLogs: " + changes.getBoolean("fogLogs", false));
try {
handleLogSessions();
} catch (Exception e) {
logError("Unable to handle log sessions", e);
resetChanges = false;
}
}
}
logDebug("Finished processing changes with resetChanges: " + resetChanges);
return resetChanges;
Expand Down Expand Up @@ -1066,7 +1077,15 @@ private Function<JsonObject, Microservice> containerJsonObjectToMicroserviceFunc
.boxed()
.map(volumeMappingObj::getJsonObject)
.map(volumeMapping -> {
VolumeMappingType volumeMappingType = volumeMapping.getString("type", "bind").equals("volume") ? VolumeMappingType.VOLUME : VolumeMappingType.BIND;
VolumeMappingType volumeMappingType;
String typeStr = volumeMapping.getString("type", "bind");
if ("volumeMount".equals(typeStr)) {
volumeMappingType = VolumeMappingType.VOLUME_MOUNT;
} else if ("volume".equals(typeStr)) {
volumeMappingType = VolumeMappingType.VOLUME;
} else {
volumeMappingType = VolumeMappingType.BIND;
}
return new VolumeMapping(volumeMapping.getString("hostDestination"),
volumeMapping.getString("containerDestination"),
volumeMapping.getString("accessMode"),
Expand Down Expand Up @@ -1145,6 +1164,48 @@ private Function<JsonObject, Microservice> containerJsonObjectToMicroserviceFunc
microservice.setMemoryLimit(jsonObj.getJsonNumber("memoryLimit").longValue());
}

JsonValue serviceAccountValue = jsonObj.get("serviceAccount");
if (serviceAccountValue != null && !serviceAccountValue.getValueType().equals(JsonValue.ValueType.NULL)) {
JsonObject serviceAccountObj = (JsonObject) serviceAccountValue;
String serviceAccountName = serviceAccountObj.containsKey("name") && !serviceAccountObj.isNull("name")
? serviceAccountObj.getString("name") : null;

RoleRef roleRef = null;
JsonValue roleRefValue = serviceAccountObj.get("roleRef");
if (roleRefValue != null && !roleRefValue.getValueType().equals(JsonValue.ValueType.NULL)) {
JsonObject roleRefObj = (JsonObject) roleRefValue;
String kind = roleRefObj.containsKey("kind") && !roleRefObj.isNull("kind")
? roleRefObj.getString("kind") : null;
String name = roleRefObj.containsKey("name") && !roleRefObj.isNull("name")
? roleRefObj.getString("name") : null;
if (kind != null && name != null) {
roleRef = new RoleRef(kind, name);
}
}

List<Rule> rules = null;
JsonValue rulesValue = serviceAccountObj.get("rules");
if (rulesValue != null && !rulesValue.getValueType().equals(JsonValue.ValueType.NULL)) {
JsonArray rulesArray = (JsonArray) rulesValue;
if (rulesArray.size() > 0) {
rules = IntStream.range(0, rulesArray.size())
.boxed()
.map(rulesArray::getJsonObject)
.map(ruleObj -> {
List<String> apiGroups = getStringList(ruleObj.get("apiGroups"));
List<String> resources = getStringList(ruleObj.get("resources"));
List<String> verbs = getStringList(ruleObj.get("verbs"));
return new Rule(apiGroups, resources, verbs);
})
.collect(toList());
}
}

if (serviceAccountName != null || roleRef != null || rules != null) {
microservice.setServiceAccount(new ServiceAccount(serviceAccountName, roleRef, rules));
}
}

try {
LoggingService.setupMicroserviceLogger(microservice.getMicroserviceUuid(), microservice.getLogSize());
} catch (IOException e) {
Expand Down Expand Up @@ -1649,6 +1710,7 @@ public JsonObject provision(String key) {
// Set initial configuration
Configuration.setIofogUuid(provisioningResult.getString("uuid"));
Configuration.setPrivateKey(provisioningResult.getString("privateKey"));
Configuration.setNamespace(provisioningResult.getString("namespace"));
Configuration.saveConfigUpdates();
Configuration.updateConfigBackUpFile();

Expand Down Expand Up @@ -1748,6 +1810,7 @@ public String deProvision(boolean isTokenExpired) {
// Store configuration values before clearing them
String iofogUuid = Configuration.getIofogUuid();
String privateKey = Configuration.getPrivateKey();
String namespace = Configuration.getNamespace();

// Attempt deprovision request if not token expired
boolean deprovisionRequestSuccessful = false;
Expand Down Expand Up @@ -1780,6 +1843,7 @@ public String deProvision(boolean isTokenExpired) {
// Configuration.setAccessToken("");
Configuration.setPrivateKey("");
Configuration.saveConfigUpdates();
Configuration.setNamespace("default");
logDebug("Configuration cleared successfully");

// Reset JWT Manager to clear static state and allow re-initialization with new credentials
Expand Down Expand Up @@ -1912,6 +1976,7 @@ public void start() {
sshProxyManager = new SshProxyManager(new SshConnection());
edgeResourceManager = EdgeResourceManager.getInstance();
volumeMountManager = VolumeMountManager.getInstance();
logSessionManager = new LogSessionManager();
boolean isConnected = ping();
getFogConfig();
if (!notProvisioned()) {
Expand Down Expand Up @@ -2462,4 +2527,134 @@ public void handleExecSessionClose(String microserviceUuid, String execId) {
LoggingService.logError(MODULE_NAME, "Error handling exec session close", e);
}
}

/**
* Fetches log sessions from controller
* @return List of LogSession objects
* @throws Exception if fetch fails
*/
private List<LogSession> fetchLogSessions() throws Exception {
logDebug("Start fetching log sessions from controller");
List<LogSession> sessions = new ArrayList<>();

if (notProvisioned() || !isControllerConnected(false)) {
logDebug("Not provisioned or not connected, returning empty list");
return sessions;
}

// Check thread interruption before making request
if (Thread.currentThread().isInterrupted()) {
logWarning("Thread interrupted before making log sessions request");
throw new InterruptedException("Thread interrupted before request");
}

try {
logDebug("Making request to controller for log sessions");
JsonObject response = orchestrator.request("logs/sessions", RequestType.GET, null, null);
logDebug("Received response from controller, parsing log sessions");
if (response != null && response.containsKey("logSessions")) {
JsonArray logSessionsArray = response.getJsonArray("logSessions");
if (logSessionsArray != null) {
for (int i = 0; i < logSessionsArray.size(); i++) {
JsonObject sessionJson = logSessionsArray.getJsonObject(i);
LogSession session = parseLogSession(sessionJson);
if (session != null) {
sessions.add(session);
}
}
}
}
logDebug("Fetched " + sessions.size() + " log sessions from controller");
} catch (CertificateException | SSLHandshakeException e) {
verificationFailed(e);
logError("Unable to get log sessions due to broken certificate",
new AgentSystemException(e.getMessage(), e));
throw e;
} catch (Exception e) {
logError("Unable to get log sessions", new AgentSystemException(e.getMessage(), e));
throw e;
}

logDebug("Finished fetching log sessions");
return sessions;
}

/**
* Parses a JSON object into a LogSession
*/
private LogSession parseLogSession(JsonObject jsonObj) {
try {
String sessionId = jsonObj.getString("sessionId");
String microserviceUuid = jsonObj.containsKey("microserviceUuid") && !jsonObj.isNull("microserviceUuid")
? jsonObj.getString("microserviceUuid") : null;
String iofogUuid = jsonObj.containsKey("iofogUuid") && !jsonObj.isNull("iofogUuid")
? jsonObj.getString("iofogUuid") : null;
String status = jsonObj.containsKey("status") ? jsonObj.getString("status") : "PENDING";
boolean agentConnected = jsonObj.containsKey("agentConnected") ? jsonObj.getBoolean("agentConnected") : false;

// Parse tailConfig
Map<String, Object> tailConfig = new HashMap<>();
if (jsonObj.containsKey("tailConfig") && !jsonObj.isNull("tailConfig")) {
JsonObject tailConfigJson = jsonObj.getJsonObject("tailConfig");
if (tailConfigJson != null) {
// Parse tailConfig fields
if (tailConfigJson.containsKey("lines")) {
tailConfig.put("lines", tailConfigJson.getInt("lines"));
}
if (tailConfigJson.containsKey("follow")) {
tailConfig.put("follow", tailConfigJson.getBoolean("follow"));
}
if (tailConfigJson.containsKey("since") && !tailConfigJson.isNull("since")) {
tailConfig.put("since", tailConfigJson.getString("since"));
}
if (tailConfigJson.containsKey("until") && !tailConfigJson.isNull("until")) {
tailConfig.put("until", tailConfigJson.getString("until"));
}
}
}

LogSession session = new LogSession(sessionId, microserviceUuid, iofogUuid, tailConfig, status, agentConnected);
return session;
} catch (Exception e) {
logError("Error parsing log session from JSON", new AgentSystemException(e.getMessage(), e));
return null;
}
}

/**
* Handles log sessions changes
*/
private void handleLogSessions() {
logDebug("Start handling log sessions");

// Check if thread is already interrupted
if (Thread.currentThread().isInterrupted()) {
logWarning("Thread already interrupted before handling log sessions");
return;
}

try {
List<LogSession> sessions = fetchLogSessions();
if (logSessionManager != null) {
logSessionManager.handleLogSessions(sessions);
} else {
logError("LogSessionManager is not initialized", new AgentSystemException("LogSessionManager is null", null));
}
} catch (AgentSystemException e) {
// Check if it's an interruption (might be transient)
if (e.getMessage() != null && e.getMessage().contains("Request interrupted")) {
logWarning("Log session fetch was interrupted (may be transient): " + e.getMessage());
// Don't reset changes flag for interruptions - allow retry on next change detection
} else {
logError("Unable to handle log sessions", e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logWarning("Thread interrupted while handling log sessions: " + e.getMessage());
// Don't reset changes flag for interruptions - allow retry on next change detection
} catch (Exception e) {
logError("Unable to handle log sessions", e);
}
logDebug("Finished handling log sessions");
}
}
Loading
Loading