plaso-rubanetra/plaso/frontend/plasm_test.py
2020-04-06 18:48:34 +02:00

196 lines
6.6 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2013 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the plasm front-end."""
import os
import shutil
import tempfile
import unittest
from plaso.engine import queue
from plaso.frontend import plasm
from plaso.frontend import test_lib
from plaso.lib import event
from plaso.lib import pfilter
from plaso.lib import storage
from plaso.multi_processing import multi_process
class TestEvent(event.EventObject):
DATA_TYPE = 'test:plasm:1'
def __init__(self, timestamp, filename='/dev/null', stuff='bar'):
super(TestEvent, self).__init__()
self.timestamp = timestamp
self.filename = filename
self.timestamp_desc = 'Last Written'
self.parser = 'TestEvent'
self.display_name = 'fake:{}'.format(filename)
self.stuff = stuff
class PlasmTest(test_lib.FrontendTestCase):
"""Tests for the plasm front-end."""
def setUp(self):
"""Sets up the objects used throughout the test."""
self._temp_directory = tempfile.mkdtemp()
self._storage_filename = os.path.join(self._temp_directory, 'plaso.db')
self._tag_input_filename = os.path.join(self._temp_directory, 'input1.tag')
tag_input_file = open(self._tag_input_filename, 'wb')
tag_input_file.write('\n'.join([
'Test Tag',
' filename contains \'/tmp/whoaaaa\'',
' parser is \'TestEvent\' and stuff is \'dude\'']))
tag_input_file.close()
pfilter.TimeRangeCache.ResetTimeConstraints()
# TODO: add upper queue limit.
test_queue = multi_process.MultiProcessingQueue()
test_queue_producer = queue.ItemQueueProducer(test_queue)
test_queue_producer.ProduceItems([
TestEvent(0),
TestEvent(1000),
TestEvent(2000000, '/tmp/whoaaaaa'),
TestEvent(2500000, '/tmp/whoaaaaa'),
TestEvent(5000000, '/tmp/whoaaaaa', 'dude')])
test_queue_producer.SignalEndOfInput()
storage_writer = storage.StorageFileWriter(
test_queue, self._storage_filename)
storage_writer.WriteEventObjects()
self._storage_file = storage.StorageFile(self._storage_filename)
self._storage_file.SetStoreLimit()
def tearDown(self):
"""Cleans up the objects used throughout the test."""
shutil.rmtree(self._temp_directory, True)
def testTagParsing(self):
"""Test if plasm can parse Tagging Input files."""
tags = plasm.ParseTaggingFile(self._tag_input_filename)
self.assertEquals(len(tags), 1)
self.assertTrue('Test Tag' in tags)
self.assertEquals(len(tags['Test Tag']), 2)
def testInvalidTagParsing(self):
"""Test what happens when Tagging Input files contain invalid conditions."""
tag_input_filename = os.path.join(self._temp_directory, 'input2.tag')
tag_input_file = open(tag_input_filename, 'wb')
tag_input_file.write('\n'.join([
'Invalid Tag', ' my hovercraft is full of eels']))
tag_input_file.close()
tags = plasm.ParseTaggingFile(tag_input_filename)
self.assertEquals(len(tags), 1)
self.assertTrue('Invalid Tag' in tags)
self.assertEquals(len(tags['Invalid Tag']), 0)
def testMixedValidityTagParsing(self):
"""Tagging Input file contains a mix of valid and invalid conditions."""
tag_input_filename = os.path.join(self._temp_directory, 'input3.tag')
tag_input_file = open(tag_input_filename, 'wb')
tag_input_file.write('\n'.join([
'Semivalid Tag', ' filename contains \'/tmp/whoaaaa\'',
' Yandelavasa grldenwi stravenka']))
tag_input_file.close()
tags = plasm.ParseTaggingFile(tag_input_filename)
self.assertEquals(len(tags), 1)
self.assertTrue('Semivalid Tag' in tags)
self.assertEquals(len(tags['Semivalid Tag']), 1)
def testIteratingOverPlasoStore(self):
"""Tests the plaso storage iterator"""
counter = 0
for _ in plasm.EventObjectGenerator(self._storage_file, quiet=True):
counter += 1
self.assertEquals(counter, 5)
self._storage_file.Close()
pfilter.TimeRangeCache.ResetTimeConstraints()
self._storage_file = storage.StorageFile(self._storage_filename)
self._storage_file.SetStoreLimit()
counter = 0
for _ in plasm.EventObjectGenerator(self._storage_file, quiet=False):
counter += 1
self.assertEquals(counter, 5)
def testTaggingEngine(self):
"""Tests the Tagging engine's functionality."""
self.assertFalse(self._storage_file.HasTagging())
tagging_engine = plasm.TaggingEngine(
self._storage_filename, self._tag_input_filename, quiet=True)
tagging_engine.Run()
test = storage.StorageFile(self._storage_filename)
self.assertTrue(test.HasTagging())
tagging = test.GetTagging()
count = 0
for tag_event in tagging:
count += 1
self.assertEquals(tag_event.tags, ['Test Tag'])
self.assertEquals(count, 3)
def testGroupingEngineUntagged(self):
"""Grouping engine should do nothing if dealing with untagged storage."""
storage_file = storage.StorageFile(self._storage_filename, read_only=False)
grouping_engine = plasm.GroupingEngine()
grouping_engine.Run(storage_file, quiet=True)
storage_file.Close()
storage_file = storage.StorageFile(self._storage_filename, read_only=True)
self.assertFalse(storage_file.HasGrouping())
storage_file.Close()
def testGroupingEngine(self):
"""Tests the Grouping engine's functionality."""
pfilter.TimeRangeCache.ResetTimeConstraints()
tagging_engine = plasm.TaggingEngine(
self._storage_filename, self._tag_input_filename, quiet=True)
tagging_engine.Run()
storage_file = storage.StorageFile(self._storage_filename, read_only=False)
grouping_engine = plasm.GroupingEngine()
grouping_engine.Run(storage_file, quiet=True)
storage_file.Close()
storage_file = storage.StorageFile(self._storage_filename, read_only=True)
storage_file.SetStoreLimit()
self.assertTrue(storage_file.HasGrouping())
groups = storage_file.GetGrouping()
count = 0
for group_event in groups:
count += 1
self.assertEquals(group_event.category, 'Test Tag')
self.assertEquals(count, 2)
storage_file.Close()
if __name__ == '__main__':
unittest.main()