196 lines
6.6 KiB
Python
196 lines
6.6 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
# Copyright 2013 The Plaso Project Authors.
|
|
# Please see the AUTHORS file for details on individual authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Tests for the plasm front-end."""
|
|
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
|
|
from plaso.engine import queue
|
|
from plaso.frontend import plasm
|
|
from plaso.frontend import test_lib
|
|
from plaso.lib import event
|
|
from plaso.lib import pfilter
|
|
from plaso.lib import storage
|
|
from plaso.multi_processing import multi_process
|
|
|
|
|
|
class TestEvent(event.EventObject):
|
|
DATA_TYPE = 'test:plasm:1'
|
|
|
|
def __init__(self, timestamp, filename='/dev/null', stuff='bar'):
|
|
super(TestEvent, self).__init__()
|
|
self.timestamp = timestamp
|
|
self.filename = filename
|
|
self.timestamp_desc = 'Last Written'
|
|
self.parser = 'TestEvent'
|
|
self.display_name = 'fake:{}'.format(filename)
|
|
self.stuff = stuff
|
|
|
|
|
|
class PlasmTest(test_lib.FrontendTestCase):
|
|
"""Tests for the plasm front-end."""
|
|
|
|
def setUp(self):
|
|
"""Sets up the objects used throughout the test."""
|
|
self._temp_directory = tempfile.mkdtemp()
|
|
self._storage_filename = os.path.join(self._temp_directory, 'plaso.db')
|
|
self._tag_input_filename = os.path.join(self._temp_directory, 'input1.tag')
|
|
|
|
tag_input_file = open(self._tag_input_filename, 'wb')
|
|
tag_input_file.write('\n'.join([
|
|
'Test Tag',
|
|
' filename contains \'/tmp/whoaaaa\'',
|
|
' parser is \'TestEvent\' and stuff is \'dude\'']))
|
|
tag_input_file.close()
|
|
|
|
pfilter.TimeRangeCache.ResetTimeConstraints()
|
|
|
|
# TODO: add upper queue limit.
|
|
test_queue = multi_process.MultiProcessingQueue()
|
|
test_queue_producer = queue.ItemQueueProducer(test_queue)
|
|
test_queue_producer.ProduceItems([
|
|
TestEvent(0),
|
|
TestEvent(1000),
|
|
TestEvent(2000000, '/tmp/whoaaaaa'),
|
|
TestEvent(2500000, '/tmp/whoaaaaa'),
|
|
TestEvent(5000000, '/tmp/whoaaaaa', 'dude')])
|
|
test_queue_producer.SignalEndOfInput()
|
|
|
|
storage_writer = storage.StorageFileWriter(
|
|
test_queue, self._storage_filename)
|
|
storage_writer.WriteEventObjects()
|
|
|
|
self._storage_file = storage.StorageFile(self._storage_filename)
|
|
self._storage_file.SetStoreLimit()
|
|
|
|
def tearDown(self):
|
|
"""Cleans up the objects used throughout the test."""
|
|
shutil.rmtree(self._temp_directory, True)
|
|
|
|
def testTagParsing(self):
|
|
"""Test if plasm can parse Tagging Input files."""
|
|
tags = plasm.ParseTaggingFile(self._tag_input_filename)
|
|
self.assertEquals(len(tags), 1)
|
|
self.assertTrue('Test Tag' in tags)
|
|
self.assertEquals(len(tags['Test Tag']), 2)
|
|
|
|
def testInvalidTagParsing(self):
|
|
"""Test what happens when Tagging Input files contain invalid conditions."""
|
|
tag_input_filename = os.path.join(self._temp_directory, 'input2.tag')
|
|
|
|
tag_input_file = open(tag_input_filename, 'wb')
|
|
tag_input_file.write('\n'.join([
|
|
'Invalid Tag', ' my hovercraft is full of eels']))
|
|
tag_input_file.close()
|
|
|
|
tags = plasm.ParseTaggingFile(tag_input_filename)
|
|
self.assertEquals(len(tags), 1)
|
|
self.assertTrue('Invalid Tag' in tags)
|
|
self.assertEquals(len(tags['Invalid Tag']), 0)
|
|
|
|
def testMixedValidityTagParsing(self):
|
|
"""Tagging Input file contains a mix of valid and invalid conditions."""
|
|
tag_input_filename = os.path.join(self._temp_directory, 'input3.tag')
|
|
|
|
tag_input_file = open(tag_input_filename, 'wb')
|
|
tag_input_file.write('\n'.join([
|
|
'Semivalid Tag', ' filename contains \'/tmp/whoaaaa\'',
|
|
' Yandelavasa grldenwi stravenka']))
|
|
tag_input_file.close()
|
|
|
|
tags = plasm.ParseTaggingFile(tag_input_filename)
|
|
self.assertEquals(len(tags), 1)
|
|
self.assertTrue('Semivalid Tag' in tags)
|
|
self.assertEquals(len(tags['Semivalid Tag']), 1)
|
|
|
|
def testIteratingOverPlasoStore(self):
|
|
"""Tests the plaso storage iterator"""
|
|
counter = 0
|
|
for _ in plasm.EventObjectGenerator(self._storage_file, quiet=True):
|
|
counter += 1
|
|
self.assertEquals(counter, 5)
|
|
|
|
self._storage_file.Close()
|
|
|
|
pfilter.TimeRangeCache.ResetTimeConstraints()
|
|
self._storage_file = storage.StorageFile(self._storage_filename)
|
|
self._storage_file.SetStoreLimit()
|
|
|
|
counter = 0
|
|
for _ in plasm.EventObjectGenerator(self._storage_file, quiet=False):
|
|
counter += 1
|
|
self.assertEquals(counter, 5)
|
|
|
|
def testTaggingEngine(self):
|
|
"""Tests the Tagging engine's functionality."""
|
|
self.assertFalse(self._storage_file.HasTagging())
|
|
tagging_engine = plasm.TaggingEngine(
|
|
self._storage_filename, self._tag_input_filename, quiet=True)
|
|
tagging_engine.Run()
|
|
test = storage.StorageFile(self._storage_filename)
|
|
self.assertTrue(test.HasTagging())
|
|
tagging = test.GetTagging()
|
|
count = 0
|
|
for tag_event in tagging:
|
|
count += 1
|
|
self.assertEquals(tag_event.tags, ['Test Tag'])
|
|
self.assertEquals(count, 3)
|
|
|
|
def testGroupingEngineUntagged(self):
|
|
"""Grouping engine should do nothing if dealing with untagged storage."""
|
|
storage_file = storage.StorageFile(self._storage_filename, read_only=False)
|
|
grouping_engine = plasm.GroupingEngine()
|
|
grouping_engine.Run(storage_file, quiet=True)
|
|
storage_file.Close()
|
|
|
|
storage_file = storage.StorageFile(self._storage_filename, read_only=True)
|
|
|
|
self.assertFalse(storage_file.HasGrouping())
|
|
|
|
storage_file.Close()
|
|
|
|
def testGroupingEngine(self):
|
|
"""Tests the Grouping engine's functionality."""
|
|
pfilter.TimeRangeCache.ResetTimeConstraints()
|
|
tagging_engine = plasm.TaggingEngine(
|
|
self._storage_filename, self._tag_input_filename, quiet=True)
|
|
tagging_engine.Run()
|
|
|
|
storage_file = storage.StorageFile(self._storage_filename, read_only=False)
|
|
grouping_engine = plasm.GroupingEngine()
|
|
grouping_engine.Run(storage_file, quiet=True)
|
|
storage_file.Close()
|
|
|
|
storage_file = storage.StorageFile(self._storage_filename, read_only=True)
|
|
|
|
storage_file.SetStoreLimit()
|
|
self.assertTrue(storage_file.HasGrouping())
|
|
groups = storage_file.GetGrouping()
|
|
count = 0
|
|
for group_event in groups:
|
|
count += 1
|
|
self.assertEquals(group_event.category, 'Test Tag')
|
|
self.assertEquals(count, 2)
|
|
|
|
storage_file.Close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|