plaso-rubanetra/plaso/analysis/windows_services.py
2020-04-06 18:48:34 +02:00

268 lines
9.4 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2014 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A plugin to enable quick triage of Windows Services."""
from plaso.analysis import interface
from plaso.lib import event
from plaso.winnt import human_readable_service_enums
# Moving this import to the bottom due to complaints from certain versions of
# linters.
import yaml
class WindowsService(yaml.YAMLObject):
"""Class to represent a Windows Service."""
# This is used for comparison operations and defines attributes that should
# not be used during evaluation of whether two services are the same.
COMPARE_EXCLUDE = frozenset(['sources'])
KEY_PATH_SEPARATOR = u'\\'
# YAML attributes
yaml_tag = u'!WindowsService'
yaml_loader = yaml.SafeLoader
yaml_dumper = yaml.SafeDumper
def __init__(self, name, service_type, image_path, start_type, object_name,
source, service_dll=None):
"""Initializes a new Windows service object.
Args:
name: The name of the service
service_type: The value of the Type value of the service key.
image_path: The value of the ImagePath value of the service key.
start_type: The value of the Start value of the service key.
object_name: The value of the ObjectName value of the service key.
source: A tuple of (pathspec, Registry key) describing where the
service was found
service_dll: Optional string value of the ServiceDll value in the
service's Parameters subkey. The default is None.
Raises:
TypeError: If a tuple with two elements is not passed as the 'source'
argument.
"""
self.name = name
self.service_type = service_type
self.image_path = image_path
self.start_type = start_type
self.service_dll = service_dll
self.object_name = object_name
if isinstance(source, tuple):
if len(source) != 2:
raise TypeError(u'Source arguments must be tuple of length 2.')
# A service may be found in multiple Control Sets or Registry hives,
# hence the list.
self.sources = [source]
else:
raise TypeError(u'Source argument must be a tuple.')
self.anomalies = []
@classmethod
def FromEvent(cls, service_event):
"""Creates a Service object from an plaso event.
Args:
service_event: The event object (instance of EventObject) to create a new
Service object from.
"""
_, _, name = service_event.keyname.rpartition(
WindowsService.KEY_PATH_SEPARATOR)
service_type = service_event.regvalue.get('Type')
image_path = service_event.regvalue.get('ImagePath')
start_type = service_event.regvalue.get('Start')
service_dll = service_event.regvalue.get('ServiceDll', u'')
object_name = service_event.regvalue.get('ObjectName', u'')
if service_event.pathspec:
source = (service_event.pathspec.location, service_event.keyname)
else:
source = (u'Unknown', u'Unknown')
return cls(
name=name, service_type=service_type, image_path=image_path,
start_type=start_type, object_name=object_name,
source=source, service_dll=service_dll)
def HumanReadableType(self):
"""Return a human readable string describing the type value."""
return human_readable_service_enums.SERVICE_ENUMS['Type'].get(
self.service_type, u'{0:d}'.format(self.service_type))
def HumanReadableStartType(self):
"""Return a human readable string describing the start_type value."""
return human_readable_service_enums.SERVICE_ENUMS['Start'].get(
self.start_type, u'{0:d}'.format(self.start_type))
def __eq__(self, other_service):
"""Custom equality method so that we match near-duplicates.
Compares two service objects together and evaluates if they are
the same or close enough to be considered to represent the same service.
For two service objects to be considered the same they need to
have the the same set of attributes and same values for all their
attributes, other than those enumerated as reserved in the
COMPARE_EXCLUDE constant.
Args:
other_service: The service (instance of WindowsService) we are testing
for equality.
Returns:
A boolean value to indicate whether the services are equal.
"""
if not isinstance(other_service, WindowsService):
return False
attributes = set(self.__dict__.keys())
other_attributes = set(self.__dict__.keys())
if attributes != other_attributes:
return False
# We compare the values for all attributes, other than those specifically
# enumerated as not relevant for equality comparisons.
for attribute in attributes.difference(self.COMPARE_EXCLUDE):
if getattr(self, attribute, None) != getattr(
other_service, attribute, None):
return False
return True
class WindowsServiceCollection(object):
"""Class to hold and de-duplicate Windows Services."""
def __init__(self):
"""Initialize a collection that holds Windows Service."""
self._services = []
def AddService(self, new_service):
"""Add a new service to the list of ones we know about.
Args:
new_service: The service (instance of WindowsService) to add.
"""
for service in self._services:
if new_service == service:
# If this service is the same as one we already know about, we
# just want to add where it came from.
service.sources.append(new_service.sources[0])
return
# We only add a new object to our list if we don't have
# an identical one already.
self._services.append(new_service)
@property
def services(self):
"""Get the services in this collection."""
return self._services
class AnalyzeWindowsServicesPlugin(interface.AnalysisPlugin):
"""Provides a single list of for Windows services found in the Registry."""
NAME = 'windows_services'
# Indicate that we can run this plugin during regular extraction.
ENABLE_IN_EXTRACTION = True
ARGUMENTS = [
('--windows-services-output', {
'dest': 'windows-services-output',
'type': unicode,
'help': 'Specify how the results should be displayed. Options are '
'text and yaml.',
'action': 'store',
'default': u'text',
'choices': [u'text', u'yaml']}),]
def __init__(self, incoming_queue, options=None):
"""Initializes the Windows Services plugin
Args:
incoming_queue: A queue to read events from.
options: Optional command line arguments (instance of
argparse.Namespace). The default is None.
"""
super(AnalyzeWindowsServicesPlugin, self).__init__(incoming_queue)
self._service_collection = WindowsServiceCollection()
self.plugin_type = interface.AnalysisPlugin.TYPE_REPORT
self._output_mode = getattr(options, 'windows-services-output', u'text')
def ExamineEvent(self, analysis_context, event_object, **kwargs):
"""Analyzes an event_object and creates Windows Services as required.
At present, this method only handles events extracted from the Registry.
Args:
analysis_context: The context object analysis plugins.
event_object: The event object (instance of EventObject) to examine.
"""
# TODO: Handle event log entries here also (ie, event id 4697).
if getattr(event_object, 'data_type', None) != 'windows:registry:service':
return
else:
# Create and store the service.
service = WindowsService.FromEvent(event_object)
self._service_collection.AddService(service)
def _FormatServiceText(self, service):
"""Produces a human readable multi-line string representing the service.
Args:
service: The service (instance of WindowsService) to format.
"""
string_segments = [
service.name,
u'\tImage Path = {0:s}'.format(service.image_path),
u'\tService Type = {0:s}'.format(service.HumanReadableType()),
u'\tStart Type = {0:s}'.format(service.HumanReadableStartType()),
u'\tService Dll = {0:s}'.format(service.service_dll),
u'\tObject Name = {0:s}'.format(service.object_name),
u'\tSources:']
for source in service.sources:
string_segments.append(u'\t\t{0:s}:{1:s}'.format(source[0], source[1]))
return u'\n'.join(string_segments)
def CompileReport(self):
"""Compiles a report of the analysis.
Returns:
The analysis report (instance of AnalysisReport).
"""
report = event.AnalysisReport()
if self._output_mode == 'yaml':
lines_of_text = []
lines_of_text.append(
yaml.safe_dump_all(self._service_collection.services))
else:
lines_of_text = ['Listing Windows Services']
for service in self._service_collection.services:
lines_of_text.append(self._FormatServiceText(service))
# Separate services with a blank line.
lines_of_text.append(u'')
report.SetText(lines_of_text)
return report