plaso-rubanetra/plaso/frontend/psort_test.py
2020-04-06 18:48:34 +02:00

198 lines
6.1 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2012 The Plaso Project Authors.
# Please see the AUTHORS file for details on individual authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the psort front-end."""
import os
import StringIO
import unittest
from plaso.formatters import interface as formatters_interface
from plaso.formatters import manager as formatters_manager
from plaso.frontend import psort
from plaso.frontend import test_lib
from plaso.lib import event
from plaso.lib import output
from plaso.lib import pfilter
from plaso.lib import storage
from plaso.lib import timelib_test
class TestEvent1(event.EventObject):
DATA_TYPE = 'test:psort:1'
def __init__(self):
super(TestEvent1, self).__init__()
self.timestamp = 123456
class TestEvent2(event.EventObject):
DATA_TYPE = 'test:psort:2'
def __init__(self, timestamp):
super(TestEvent2, self).__init__()
self.timestamp = timestamp
self.timestamp_desc = 'Last Written'
self.parser = 'TestEvent'
self.display_name = '/dev/none'
self.filename = '/dev/none'
self.some = u'My text dude.'
self.var = {'Issue': False, 'Closed': True}
class TestEvent2Formatter(formatters_interface.EventFormatter):
DATA_TYPE = 'test:psort:2'
FORMAT_STRING = 'My text goes along: {some} lines'
SOURCE_SHORT = 'LOG'
SOURCE_LONG = 'None in Particular'
class TestFormatter(output.LogOutputFormatter):
"""Dummy formatter."""
def FetchEntry(self, store_number=-1, store_index=-1):
return self.store.GetSortedEntry()
def Start(self):
self.filehandle.write((
'date,time,timezone,MACB,source,sourcetype,type,user,host,'
'short,desc,version,filename,inode,notes,format,extra\n'))
def EventBody(self, event_object):
"""Writes the event body.
Args:
event_object: The event object (instance of EventObject).
"""
event_formatter = formatters_manager.EventFormatterManager.GetFormatter(
event_object)
msg, _ = event_formatter.GetMessages(event_object)
source_short, source_long = event_formatter.GetSources(event_object)
self.filehandle.write(u'{0:s}/{1:s} {2:s}\n'.format(
source_short, source_long, msg))
class TestEventBuffer(output.EventBuffer):
"""A test event buffer."""
def __init__(self, store, formatter=None):
self.record_count = 0
self.store = store
if not formatter:
formatter = TestFormatter(store)
super(TestEventBuffer, self).__init__(formatter, False)
def Append(self, event_object):
self._buffer_dict[event_object.EqualityString()] = event_object
self.record_count += 1
def Flush(self):
for event_object_key in self._buffer_dict:
self.formatter.EventBody(self._buffer_dict[event_object_key])
self._buffer_dict = {}
def End(self):
pass
class PsortFrontendTest(test_lib.FrontendTestCase):
"""Tests for the psort front-end."""
def setUp(self):
"""Setup sets parameters that will be reused throughout this test."""
self._front_end = psort.PsortFrontend()
# TODO: have sample output generated from the test.
self._test_file = os.path.join(self._TEST_DATA_PATH, 'psort_test.out')
self.first = timelib_test.CopyStringToTimestamp('2012-07-24 21:45:24')
self.last = timelib_test.CopyStringToTimestamp('2016-11-18 01:15:43')
def testReadEntries(self):
"""Ensure returned EventObjects from the storage are within timebounds."""
timestamp_list = []
pfilter.TimeRangeCache.ResetTimeConstraints()
pfilter.TimeRangeCache.SetUpperTimestamp(self.last)
pfilter.TimeRangeCache.SetLowerTimestamp(self.first)
storage_file = storage.StorageFile(self._test_file, read_only=True)
storage_file.SetStoreLimit()
event_object = storage_file.GetSortedEntry()
while event_object:
timestamp_list.append(event_object.timestamp)
event_object = storage_file.GetSortedEntry()
self.assertEquals(len(timestamp_list), 8)
self.assertTrue(
timestamp_list[0] >= self.first and timestamp_list[-1] <= self.last)
storage_file.Close()
def testOutput(self):
"""Testing if psort can output data."""
events = []
events.append(TestEvent2(5134324321))
events.append(TestEvent2(2134324321))
events.append(TestEvent2(9134324321))
events.append(TestEvent2(15134324321))
events.append(TestEvent2(5134324322))
events.append(TestEvent2(5134024321))
output_fd = StringIO.StringIO()
with test_lib.TempDirectory() as dirname:
temp_file = os.path.join(dirname, 'plaso.db')
storage_file = storage.StorageFile(temp_file, read_only=False)
pfilter.TimeRangeCache.ResetTimeConstraints()
storage_file.SetStoreLimit()
storage_file.AddEventObjects(events)
storage_file.Close()
storage_file = storage.StorageFile(temp_file)
with storage_file:
storage_file.store_range = [1]
formatter = TestFormatter(storage_file, output_fd)
event_buffer = TestEventBuffer(storage_file, formatter)
psort.ProcessOutput(event_buffer, formatter, None)
event_buffer.Flush()
lines = []
for line in output_fd.getvalue().split('\n'):
if line == '.':
continue
if line:
lines.append(line)
# One more line than events (header row).
self.assertEquals(len(lines), 7)
self.assertTrue('My text goes along: My text dude. lines' in lines[2])
self.assertTrue('LOG/' in lines[2])
self.assertTrue('None in Particular' in lines[2])
self.assertEquals(lines[0], (
'date,time,timezone,MACB,source,sourcetype,type,user,host,short,desc,'
'version,filename,inode,notes,format,extra'))
if __name__ == '__main__':
unittest.main()