#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright 2012 The Plaso Project Authors. # Please see the AUTHORS file for details on individual authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """This file contains tests for the output formatter.""" import os import locale import sys import tempfile import unittest from plaso.lib import output class DummyEvent(object): """Simple class that defines a dummy event.""" def __init__(self, timestamp, entry): self.date = u'03/01/2012' try: self.timestamp = int(timestamp) except ValueError: self.timestamp = 0 self.entry = entry def EqualityString(self): return u';'.join(map(str, [self.timestamp, self.entry])) class TestOutput(output.LogOutputFormatter): """This is a test output module that provides a simple XML.""" def __init__(self, filehandle): """Fake the store.""" super(TestOutput, self).__init__(store=None, filehandle=filehandle) def StartEvent(self): self.filehandle.write(u'\n') def EventBody(self, event_object): self.filehandle.write(( u'\t{0:s}\n\t\n' u'\t{2:s}\n').format( event_object.date, event_object.timestamp, event_object.entry)) def EndEvent(self): self.filehandle.write(u'\n') def FetchEntry(self, **_): pass def Start(self): self.filehandle.write(u'\n') def End(self): self.filehandle.write(u'\n') class PlasoOutputUnitTest(unittest.TestCase): """The unit test for plaso output formatting.""" def testOutput(self): """Test a test implementation of the output formatter.""" events = [DummyEvent(123456, u'My Event Is Now!'), DummyEvent(123458, u'There is no tomorrow.'), DummyEvent(123462, u'Tomorrow is now.'), DummyEvent(123489, u'This is just some stuff to fill the line.')] lines = [] with tempfile.NamedTemporaryFile() as fh: formatter = TestOutput(fh) formatter.Start() for event_object in events: formatter.WriteEvent(event_object) formatter.End() fh.seek(0) for line in fh: lines.append(line) self.assertEquals(len(lines), 22) self.assertEquals(lines[0], u'\n') self.assertEquals(lines[1], u'\n') self.assertEquals(lines[2], u'\t03/01/2012\n') self.assertEquals(lines[3], u'\t\n') self.assertEquals(lines[4], u'\tMy Event Is Now!\n') self.assertEquals(lines[5], u'\n') self.assertEquals(lines[6], u'\n') self.assertEquals(lines[7], u'\t03/01/2012\n') self.assertEquals(lines[8], u'\t\n') self.assertEquals(lines[9], u'\tThere is no tomorrow.\n') self.assertEquals(lines[10], u'\n') self.assertEquals(lines[11], u'\n') self.assertEquals(lines[-1], u'\n') def testOutputList(self): """Test listing up all available registered modules.""" module_seen = False for name, description in output.ListOutputFormatters(): if 'TestOutput' in name: module_seen = True self.assertEquals(description, ( u'This is a test output module that provides a simple XML.')) self.assertTrue(module_seen) class EventBufferTest(unittest.TestCase): """Few unit tests for the EventBuffer class.""" def testFlush(self): """Test to ensure we empty our buffers and sends to output properly.""" with tempfile.NamedTemporaryFile() as fh: def CheckBufferLength(event_buffer, expected): if not event_buffer.check_dedups: expected = 0 # pylint: disable=protected-access self.assertEquals(len(event_buffer._buffer_dict), expected) formatter = TestOutput(fh) event_buffer = output.EventBuffer(formatter, False) event_buffer.Append(DummyEvent(123456, u'Now is now')) CheckBufferLength(event_buffer, 1) # Add three events. event_buffer.Append(DummyEvent(123456, u'OMG I AM DIFFERENT')) event_buffer.Append(DummyEvent(123456, u'Now is now')) event_buffer.Append(DummyEvent(123456, u'Now is now')) CheckBufferLength(event_buffer, 2) event_buffer.Flush() CheckBufferLength(event_buffer, 0) event_buffer.Append(DummyEvent(123456, u'Now is now')) event_buffer.Append(DummyEvent(123456, u'Now is now')) event_buffer.Append(DummyEvent(123456, u'Different again :)')) CheckBufferLength(event_buffer, 2) event_buffer.Append(DummyEvent(123457, u'Now is different')) CheckBufferLength(event_buffer, 1) class OutputFilehandleTest(unittest.TestCase): """Few unit tests for the OutputFilehandle.""" def setUp(self): self.preferred_encoding = locale.getpreferredencoding() def _GetLine(self): # Time, Þorri allra landsmanna hlýddu á atburðinn. return ('Time, \xc3\x9eorri allra landsmanna hl\xc3\xbdddu \xc3\xa1 ' 'atbur\xc3\xb0inn.\n').decode('utf-8') def testFilePath(self): temp_path = '' with tempfile.NamedTemporaryFile(delete=True) as temp_file: temp_path = temp_file.name with output.OutputFilehandle(self.preferred_encoding) as fh: fh.Open(path=temp_path) fh.WriteLine(self._GetLine()) line_read = u'' with open(temp_path, 'rb') as output_file: line_read = output_file.read() os.remove(temp_path) self.assertEquals(line_read, self._GetLine().encode('utf-8')) def testStdOut(self): with output.OutputFilehandle(self.preferred_encoding) as fh: fh.Open(sys.stdout) try: fh.WriteLine(self._GetLine()) self.assertTrue(True) except (UnicodeEncodeError, UnicodeDecodeError): self.assertTrue(False) if __name__ == '__main__': unittest.main()