333 lines
12 KiB
Python
333 lines
12 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright 2014 The Plaso Project Authors.
|
|
# Please see the AUTHORS file for details on individual authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""This file contains a foreman class for monitoring workers."""
|
|
|
|
import collections
|
|
import logging
|
|
|
|
from plaso.multi_processing import process_info
|
|
|
|
|
|
class Foreman(object):
|
|
"""A foreman class that monitors workers.
|
|
|
|
The Foreman is responsible for monitoring worker processes
|
|
and give back status information. The status information contains
|
|
among other things:
|
|
+ Number of events extracted from each worker.
|
|
+ Path of the current file the worker is processing.
|
|
+ Indications whether the worker is alive or not.
|
|
+ Memory consumption of the worker.
|
|
|
|
This information is gathered using both RPC calls to the worker
|
|
itself as well as data provided by the psutil library.
|
|
|
|
In the future the Foreman should be able to actively monitor
|
|
the health of the processes and terminate and restart processes
|
|
that are stuck.
|
|
"""
|
|
|
|
PROCESS_LABEL = collections.namedtuple('process_label', 'label pid process')
|
|
|
|
def __init__(self, show_memory_usage=False):
|
|
"""Initialize the foreman process.
|
|
|
|
Args:
|
|
show_memory_usage: Optional boolean value to indicate memory information
|
|
should be included in logging. The default is false.
|
|
"""
|
|
self._last_status_dict = {}
|
|
self._process_information = process_info.ProcessInfo()
|
|
self._process_labels = []
|
|
self._processing_done = False
|
|
self._show_memory_usage = show_memory_usage
|
|
|
|
@property
|
|
def labels(self):
|
|
"""Return a list of all currently watched labels."""
|
|
return self._process_labels
|
|
|
|
@property
|
|
def number_of_processes_in_watch_list(self):
|
|
"""Return the number of processes in the watch list."""
|
|
return len(self._process_labels)
|
|
|
|
def CheckStatus(self, label=None):
|
|
"""Checks status of either a single process or all from the watch list.
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL), if not provided
|
|
all processes from the watch list are checked. Defaults to None.
|
|
"""
|
|
if label is not None:
|
|
self._CheckStatus(label)
|
|
return
|
|
|
|
for process_label in self._process_labels:
|
|
self._CheckStatus(process_label)
|
|
|
|
def GetLabel(self, name=None, pid=None):
|
|
"""Return a label if found using either name or PID value.
|
|
|
|
Args:
|
|
name: String value that should match an already existing label.
|
|
pid: A process ID (PID) value for a process that is monitored.
|
|
|
|
Returns:
|
|
A label (instance of PROCESS_LABEL) if found. If neither name
|
|
nor pid value is given or the process does not exist a None value
|
|
will be returned.
|
|
"""
|
|
if name is not None:
|
|
for process_label in self._process_labels:
|
|
if process_label.label == name:
|
|
return process_label
|
|
|
|
if pid is not None:
|
|
for process_label in self._process_labels:
|
|
if process_label.pid == pid:
|
|
return process_label
|
|
|
|
def MonitorWorker(self, label=None, pid=None, name=None):
|
|
"""Starts monitoring a worker by adding it to the monitor list.
|
|
|
|
This function requires either a label to be set or a PID and a process
|
|
name. If the label is empty or if both a PID and a name is not provided
|
|
the function does nothing, as in no process is added to the list of
|
|
workers to monitor (and no indication).
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL), if not provided
|
|
then a pid and a name is required. Defaults to None (if None
|
|
then both a pid and name have to be provided).
|
|
pid: The process ID (PID) of the worker that should be added to the
|
|
monitor list. This is only required if label is not provided.
|
|
Defaults to None. This is only used if label is set to None, in
|
|
which case it has to be set.
|
|
name: The name of the worker process, only required if label is not
|
|
provided. Defaults to None, only used if label is set to None,
|
|
in which case it has to be set.
|
|
"""
|
|
if label is None:
|
|
if pid is None or name is None:
|
|
return
|
|
label = self.PROCESS_LABEL(name, pid, process_info.ProcessInfo(pid=pid))
|
|
|
|
if not label:
|
|
return
|
|
|
|
if label not in self._process_labels:
|
|
self._process_labels.append(label)
|
|
|
|
def StopMonitoringWorker(self, label=None, pid=None, name=None):
|
|
"""Stop monitoring a particular worker and remove it from monitor list.
|
|
|
|
The purpose of this function is to remove a worker from the list of
|
|
monitored workers. In order to do that the function requires either a
|
|
label or a pid and a name.
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL). Defaults to None, and
|
|
so then a pid and name are required.
|
|
pid: The process ID (PID) of the worker that should no longer be
|
|
monitored. This is only required if label is not provided and
|
|
defaults to None.
|
|
name: The name of the worker process, defaults to None and is only
|
|
required if label is not set.
|
|
"""
|
|
if label is None:
|
|
if pid is None or name is None:
|
|
return
|
|
label = self.PROCESS_LABEL(
|
|
name, pid, process_info.ProcessInfo(pid=pid))
|
|
|
|
if label not in self._process_labels:
|
|
return
|
|
|
|
index = self._process_labels.index(label)
|
|
del self._process_labels[index]
|
|
logging.info(
|
|
u'{0:s} [{1:d}] has been removed from foreman monitoring.'.format(
|
|
label.label, label.pid))
|
|
|
|
def SignalEndOfProcessing(self):
|
|
"""Indicate that processing is done."""
|
|
self._processing_done = True
|
|
# TODO: Reconsider this as an info signal. Should this not be moved to
|
|
# a debug one?
|
|
logging.info(
|
|
u'Foreman received a signal indicating that processing is completed.')
|
|
|
|
# This function may be called via RPC functions that expects a value to be
|
|
# returned.
|
|
return True
|
|
|
|
def TerminateProcess(self, label=None, pid=None, name=None):
|
|
"""Terminate a process, even if it is not in the watch list.
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL), if not provided
|
|
then a pid and a name is required. It defaults to None, in which
|
|
case you need to provide a pid and/or a name.
|
|
pid: The process ID (PID) of the worker. This is only required if label
|
|
is not provided and defaults to None.
|
|
name: The name of the worker process, only required if label is not
|
|
provided and defaults to None.
|
|
"""
|
|
if label is not None:
|
|
self._TerminateProcess(label)
|
|
return
|
|
|
|
if pid is not None:
|
|
for process_label in self._process_labels:
|
|
if process_label.pid == pid:
|
|
self._TerminateProcess(process_label)
|
|
return
|
|
|
|
if name is not None:
|
|
for process_label in self._process_labels:
|
|
if process_label.label == name:
|
|
self._TerminateProcess(process_label)
|
|
return
|
|
|
|
# If we reach here the process is not in our watch list.
|
|
if pid is not None and name is not None:
|
|
process_label = self.PROCESS_LABEL(
|
|
name, pid, process_info.ProcessInfo(pid=pid))
|
|
self._TerminateProcess(process_label)
|
|
|
|
def _CheckStatus(self, label):
|
|
"""Check status for a single process from the watch list.
|
|
|
|
This function will take a single label, which describes a worker process
|
|
and check if it is alive, call the appropriate functions to log down
|
|
information extracted from the worker and if a process is no longer alive
|
|
and processing has been marked as done, it will remove the worker from
|
|
the list of monitored workers. This function is also reponsible for killing
|
|
or terminating a process that is alive and hanging, or not alive while
|
|
it should be alive.
|
|
|
|
In the future this function will also be responsible for restarting
|
|
a worker, or signalling the engine that it needs to spin up a new worker
|
|
in the case of a worker dying or being in an effective zombie state.
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL).
|
|
"""
|
|
if label not in self._process_labels:
|
|
return
|
|
|
|
process = label.process
|
|
|
|
if process.IsAlive():
|
|
status_dict = process.GetProcessStatus()
|
|
if not status_dict and not self._processing_done:
|
|
logging.warning((
|
|
u'Unable to connect to RPC socket to: {0:s} at '
|
|
u'http://localhost:{1:d}').format(label.label, label.pid))
|
|
|
|
if status_dict:
|
|
self._last_status_dict[label.pid] = status_dict
|
|
if status_dict.get('is_running', False):
|
|
self._LogWorkerInformation(label, status_dict)
|
|
if self._show_memory_usage:
|
|
self._LogMemoryUsage(label)
|
|
return
|
|
else:
|
|
logging.info(
|
|
u'Process {0:s} [{1:d}] has complete it\'s processing. Total of '
|
|
u'{2:d} events extracted'.format(
|
|
label.label, label.pid, status_dict.get('counter', 0)))
|
|
|
|
else:
|
|
logging.info(u'Process {0:s} [{1:d}] is not alive.'.format(
|
|
label.label, label.pid))
|
|
|
|
# Check if this process should be alive.
|
|
if self._processing_done:
|
|
# This process exited properly and should have. Let's remove it from our
|
|
# list of labels.
|
|
self.StopMonitoringWorker(label=label)
|
|
return
|
|
|
|
# We need to terminate the process.
|
|
# TODO: Add a function to start a new instance of a worker instead of
|
|
# just removing and killing it.
|
|
logging.error(
|
|
u'Process {0:s} [{1:d}] is not functioning when it should be. '
|
|
u'Terminating it and removing from list.'.format(
|
|
label.label, label.pid))
|
|
self._TerminateProcess(label)
|
|
|
|
def _LogMemoryUsage(self, label):
|
|
"""Logs memory information gathered from a process.
|
|
|
|
This function will take a label and call the logging infrastructure to
|
|
log information about the process's memory information.
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL).
|
|
"""
|
|
mem_info = label.process.GetMemoryInformation()
|
|
logging.info((
|
|
u'{0:s} - RSS: {1:d}, VMS: {2:d}, Shared: {3:d}, Text: {4:d}, lib: '
|
|
u'{5:d}, data: {6:d}, dirty: {7:d}, Memory Percent: {8:0.2f}%').format(
|
|
label.label, mem_info.rss, mem_info.vms, mem_info.shared,
|
|
mem_info.text, mem_info.lib, mem_info.data, mem_info.dirty,
|
|
mem_info.percent * 100))
|
|
|
|
def _LogWorkerInformation(self, label, status=None):
|
|
"""Log information gathered from the worker.
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL).
|
|
"""
|
|
if status:
|
|
logging.info((
|
|
u'{0:s} [{1:d}] - Events Extracted: {2:d} - File ({3:s}) - Running: '
|
|
u'{4!s} <{5:s}>').format(
|
|
label.label, label.pid, status.get('counter', -1),
|
|
status.get('current_file', u''), status.get('is_running', False),
|
|
unicode(label.process.status)))
|
|
|
|
def _TerminateProcess(self, label):
|
|
"""Terminate a process given a process label.
|
|
|
|
Attempts to terminate a process and if successful
|
|
removes the label from the watch list.
|
|
|
|
Args:
|
|
label: A process label (instance of PROCESS_LABEL).
|
|
"""
|
|
if label is None:
|
|
return
|
|
|
|
label.process.TerminateProcess()
|
|
|
|
# Double check the process is dead.
|
|
if label.process.IsAlive():
|
|
logging.warning(u'Process {0:s} [{1:d}] is still alive.'.format(
|
|
label.label, label.pid))
|
|
elif label.process.status != 'exited':
|
|
logging.warning(u'Process {0:s} [{1:d}] may still be alive.'.format(
|
|
label.label, label.pid))
|
|
else:
|
|
logging.info(u'Process: {0:s} [{1:d}] has been terminated.'.format(
|
|
label.label, label.pid))
|
|
self.StopMonitoringWorker(label)
|