355 lines
12 KiB
Python
355 lines
12 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright 2013 The Plaso Project Authors.
|
|
# Please see the AUTHORS file for details on individual authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""The unit tests for the generic collector object."""
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
|
|
from dfvfs.helpers import file_system_searcher
|
|
from dfvfs.lib import definitions as dfvfs_definitions
|
|
from dfvfs.path import factory as path_spec_factory
|
|
from dfvfs.resolver import context
|
|
from dfvfs.resolver import resolver as path_spec_resolver
|
|
|
|
from plaso.engine import collector
|
|
from plaso.engine import queue
|
|
from plaso.engine import single_process
|
|
from plaso.engine import utils as engine_utils
|
|
|
|
|
|
class TempDirectory(object):
|
|
"""A self cleaning temporary directory."""
|
|
|
|
def __init__(self):
|
|
"""Initializes the temporary directory."""
|
|
super(TempDirectory, self).__init__()
|
|
self.name = u''
|
|
|
|
def __enter__(self):
|
|
"""Make this work with the 'with' statement."""
|
|
self.name = tempfile.mkdtemp()
|
|
return self.name
|
|
|
|
def __exit__(self, unused_type, unused_value, unused_traceback):
|
|
"""Make this work with the 'with' statement."""
|
|
shutil.rmtree(self.name, True)
|
|
|
|
|
|
class TestCollectorQueueConsumer(queue.ItemQueueConsumer):
|
|
"""Class that implements a test collector queue consumer."""
|
|
|
|
def __init__(self, queue_object):
|
|
"""Initializes the queue consumer.
|
|
|
|
Args:
|
|
queue_object: the queue object (instance of Queue).
|
|
"""
|
|
super(TestCollectorQueueConsumer, self).__init__(queue_object)
|
|
self.path_specs = []
|
|
|
|
def _ConsumeItem(self, path_spec):
|
|
"""Consumes an item callback for ConsumeItems.
|
|
|
|
Args:
|
|
path_spec: a path specification (instance of dfvfs.PathSpec).
|
|
"""
|
|
self.path_specs.append(path_spec)
|
|
|
|
@property
|
|
def number_of_path_specs(self):
|
|
"""The number of path specifications."""
|
|
return len(self.path_specs)
|
|
|
|
def GetFilePaths(self):
|
|
"""Retrieves a list of file paths from the path specifications."""
|
|
file_paths = []
|
|
for path_spec in self.path_specs:
|
|
location = getattr(path_spec, 'location', None)
|
|
if location is not None:
|
|
file_paths.append(location)
|
|
return file_paths
|
|
|
|
|
|
class CollectorTestCase(unittest.TestCase):
|
|
"""The collector test case."""
|
|
|
|
_TEST_DATA_PATH = os.path.join(os.getcwd(), u'test_data')
|
|
|
|
# Show full diff results, part of TestCase so does not follow our naming
|
|
# conventions.
|
|
maxDiff = None
|
|
|
|
def _GetTestFilePath(self, path_segments):
|
|
"""Retrieves the path of a test file relative to the test data directory.
|
|
|
|
Args:
|
|
path_segments: the path segments inside the test data directory.
|
|
|
|
Returns:
|
|
A path of the test file.
|
|
"""
|
|
# Note that we need to pass the individual path segments to os.path.join
|
|
# and not a list.
|
|
return os.path.join(self._TEST_DATA_PATH, *path_segments)
|
|
|
|
|
|
class CollectorTest(CollectorTestCase):
|
|
"""Tests for the collector."""
|
|
|
|
def testFileSystemCollection(self):
|
|
"""Test collection on the file system."""
|
|
test_files = [
|
|
self._GetTestFilePath([u'syslog.tgz']),
|
|
self._GetTestFilePath([u'syslog.zip']),
|
|
self._GetTestFilePath([u'syslog.bz2']),
|
|
self._GetTestFilePath([u'wtmp.1'])]
|
|
|
|
with TempDirectory() as dirname:
|
|
for a_file in test_files:
|
|
shutil.copy(a_file, dirname)
|
|
|
|
path_spec = path_spec_factory.Factory.NewPathSpec(
|
|
dfvfs_definitions.TYPE_INDICATOR_OS, location=dirname)
|
|
|
|
test_collection_queue = single_process.SingleProcessQueue()
|
|
resolver_context = context.Context()
|
|
test_collector = collector.Collector(
|
|
test_collection_queue, dirname, path_spec,
|
|
resolver_context=resolver_context)
|
|
test_collector.Collect()
|
|
|
|
test_collector_queue_consumer = TestCollectorQueueConsumer(
|
|
test_collection_queue)
|
|
test_collector_queue_consumer.ConsumeItems()
|
|
|
|
self.assertEquals(test_collector_queue_consumer.number_of_path_specs, 4)
|
|
|
|
def testFileSystemWithFilterCollection(self):
|
|
"""Test collection on the file system with a filter."""
|
|
dirname = u'.'
|
|
path_spec = path_spec_factory.Factory.NewPathSpec(
|
|
dfvfs_definitions.TYPE_INDICATOR_OS, location=dirname)
|
|
|
|
filter_name = ''
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
|
filter_name = temp_file.name
|
|
temp_file.write('/test_data/testdir/filter_.+.txt\n')
|
|
temp_file.write('/test_data/.+evtx\n')
|
|
temp_file.write('/AUTHORS\n')
|
|
temp_file.write('/does_not_exist/some_file_[0-9]+txt\n')
|
|
|
|
test_collection_queue = single_process.SingleProcessQueue()
|
|
resolver_context = context.Context()
|
|
test_collector = collector.Collector(
|
|
test_collection_queue, dirname, path_spec,
|
|
resolver_context=resolver_context)
|
|
|
|
find_specs = engine_utils.BuildFindSpecsFromFile(filter_name)
|
|
test_collector.SetFilter(find_specs)
|
|
|
|
test_collector.Collect()
|
|
|
|
test_collector_queue_consumer = TestCollectorQueueConsumer(
|
|
test_collection_queue)
|
|
test_collector_queue_consumer.ConsumeItems()
|
|
|
|
try:
|
|
os.remove(filter_name)
|
|
except (OSError, IOError) as exception:
|
|
logging.warning((
|
|
u'Unable to remove temporary file: {0:s} with error: {1:s}').format(
|
|
filter_name, exception))
|
|
|
|
# Two files with test_data/testdir/filter_*.txt, AUTHORS
|
|
# and test_data/System.evtx.
|
|
self.assertEquals(test_collector_queue_consumer.number_of_path_specs, 4)
|
|
|
|
paths = test_collector_queue_consumer.GetFilePaths()
|
|
|
|
current_directory = os.getcwd()
|
|
|
|
expected_path = os.path.join(
|
|
current_directory, u'test_data', u'testdir', u'filter_1.txt')
|
|
self.assertTrue(expected_path in paths)
|
|
|
|
expected_path = os.path.join(
|
|
current_directory, u'test_data', u'testdir', u'filter_2.txt')
|
|
self.assertFalse(expected_path in paths)
|
|
|
|
expected_path = os.path.join(
|
|
current_directory, u'test_data', u'testdir', u'filter_3.txt')
|
|
self.assertTrue(expected_path in paths)
|
|
|
|
expected_path = os.path.join(
|
|
current_directory, u'AUTHORS')
|
|
self.assertTrue(expected_path in paths)
|
|
|
|
def testImageCollection(self):
|
|
"""Test collection on a storage media image file.
|
|
|
|
This images has two files:
|
|
+ logs/hidden.zip
|
|
+ logs/sys.tgz
|
|
|
|
The hidden.zip file contains one file, syslog, which is the
|
|
same for sys.tgz.
|
|
|
|
The end results should therefore be:
|
|
+ logs/hidden.zip (unchanged)
|
|
+ logs/hidden.zip:syslog (the text file extracted out)
|
|
+ logs/sys.tgz (unchanged)
|
|
+ logs/sys.tgz (read as a GZIP file, so not compressed)
|
|
+ logs/sys.tgz:syslog.gz (A GZIP file from the TAR container)
|
|
+ logs/sys.tgz:syslog.gz:syslog (the extracted syslog file)
|
|
|
|
This means that the collection script should collect 6 files in total.
|
|
"""
|
|
test_file = self._GetTestFilePath([u'syslog_image.dd'])
|
|
|
|
volume_path_spec = path_spec_factory.Factory.NewPathSpec(
|
|
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file)
|
|
path_spec = path_spec_factory.Factory.NewPathSpec(
|
|
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
|
|
parent=volume_path_spec)
|
|
|
|
test_collection_queue = single_process.SingleProcessQueue()
|
|
resolver_context = context.Context()
|
|
test_collector = collector.Collector(
|
|
test_collection_queue, test_file, path_spec,
|
|
resolver_context=resolver_context)
|
|
test_collector.Collect()
|
|
|
|
test_collector_queue_consumer = TestCollectorQueueConsumer(
|
|
test_collection_queue)
|
|
test_collector_queue_consumer.ConsumeItems()
|
|
|
|
self.assertEquals(test_collector_queue_consumer.number_of_path_specs, 3)
|
|
|
|
def testImageWithFilterCollection(self):
|
|
"""Test collection on a storage media image file with a filter."""
|
|
test_file = self._GetTestFilePath([u'ímynd.dd'])
|
|
|
|
volume_path_spec = path_spec_factory.Factory.NewPathSpec(
|
|
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file)
|
|
path_spec = path_spec_factory.Factory.NewPathSpec(
|
|
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
|
|
parent=volume_path_spec)
|
|
|
|
filter_name = ''
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
|
filter_name = temp_file.name
|
|
temp_file.write('/a_directory/.+zip\n')
|
|
temp_file.write('/a_directory/another.+\n')
|
|
temp_file.write('/passwords.txt\n')
|
|
|
|
test_collection_queue = single_process.SingleProcessQueue()
|
|
resolver_context = context.Context()
|
|
test_collector = collector.Collector(
|
|
test_collection_queue, test_file, path_spec,
|
|
resolver_context=resolver_context)
|
|
|
|
find_specs = engine_utils.BuildFindSpecsFromFile(filter_name)
|
|
test_collector.SetFilter(find_specs)
|
|
|
|
test_collector.Collect()
|
|
|
|
test_collector_queue_consumer = TestCollectorQueueConsumer(
|
|
test_collection_queue)
|
|
test_collector_queue_consumer.ConsumeItems()
|
|
|
|
try:
|
|
os.remove(filter_name)
|
|
except (OSError, IOError) as exception:
|
|
logging.warning((
|
|
u'Unable to remove temporary file: {0:s} with error: {1:s}').format(
|
|
filter_name, exception))
|
|
|
|
self.assertEquals(test_collector_queue_consumer.number_of_path_specs, 2)
|
|
|
|
paths = test_collector_queue_consumer.GetFilePaths()
|
|
|
|
# path_specs[0]
|
|
# type: TSK
|
|
# file_path: '/a_directory/another_file'
|
|
# container_path: 'test_data/ímynd.dd'
|
|
# image_offset: 0
|
|
self.assertEquals(paths[0], u'/a_directory/another_file')
|
|
|
|
# path_specs[1]
|
|
# type: TSK
|
|
# file_path: '/passwords.txt'
|
|
# container_path: 'test_data/ímynd.dd'
|
|
# image_offset: 0
|
|
self.assertEquals(paths[1], u'/passwords.txt')
|
|
|
|
|
|
class BuildFindSpecsFromFileTest(unittest.TestCase):
|
|
"""Tests for the BuildFindSpecsFromFile function."""
|
|
|
|
def testBuildFindSpecsFromFile(self):
|
|
"""Tests the BuildFindSpecsFromFile function."""
|
|
filter_name = ''
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
|
filter_name = temp_file.name
|
|
# 2 hits.
|
|
temp_file.write('/test_data/testdir/filter_.+.txt\n')
|
|
# A single hit.
|
|
temp_file.write('/test_data/.+evtx\n')
|
|
# A single hit.
|
|
temp_file.write('/AUTHORS\n')
|
|
temp_file.write('/does_not_exist/some_file_[0-9]+txt\n')
|
|
# This should not compile properly, missing file information.
|
|
temp_file.write('failing/\n')
|
|
# This should not fail during initial loading, but fail later on.
|
|
temp_file.write('bad re (no close on that parenthesis/file\n')
|
|
|
|
find_specs = engine_utils.BuildFindSpecsFromFile(filter_name)
|
|
|
|
try:
|
|
os.remove(filter_name)
|
|
except (OSError, IOError) as exception:
|
|
logging.warning(
|
|
u'Unable to remove temporary file: {0:s} with error: {1:s}'.format(
|
|
filter_name, exception))
|
|
|
|
self.assertEquals(len(find_specs), 4)
|
|
|
|
dirname = u'.'
|
|
path_spec = path_spec_factory.Factory.NewPathSpec(
|
|
dfvfs_definitions.TYPE_INDICATOR_OS, location=dirname)
|
|
file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec)
|
|
searcher = file_system_searcher.FileSystemSearcher(
|
|
file_system, path_spec)
|
|
|
|
path_spec_generator = searcher.Find(find_specs=find_specs)
|
|
self.assertNotEquals(path_spec_generator, None)
|
|
|
|
path_specs = list(path_spec_generator)
|
|
# One evtx, one AUTHORS, two filter_*.txt files, total 4 files.
|
|
self.assertEquals(len(path_specs), 4)
|
|
|
|
with self.assertRaises(IOError):
|
|
_ = engine_utils.BuildFindSpecsFromFile('thisfiledoesnotexist')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|