1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;
use std::marker::PhantomData;

use storage::compression::{VByteEncoded, VByteDecoder};
use storage::{ByteEncodable, ByteDecodable};
use utils::persistence::Persistent;

use storage::{Result, StorageError, Storage};

const ENTRIES_FILENAME: &'static str = "entries.bin";
const DATA_FILENAME: &'static str = "data.bin";
const ASSOCIATED_FILES: &'static [&'static str; 2] = &[ENTRIES_FILENAME, DATA_FILENAME];

/// Writes datastructures to a filesystem. Compressed and retrievable.
pub struct FsStorage<TItem> {
    // Stores for every id the offset in the file and the length
    entries: BTreeMap<u64, (u64 /* offset */, u32 /* length */)>,
    persistent_entries: File,
    data: File,
    current_offset: u64,
    current_id: u64,
    _item_type: PhantomData<TItem>,
}

impl<TItem> Persistent for FsStorage<TItem> {
    /// Creates a new and empty instance of FsStorage which can be loaded
    /// afterwards
    fn create(path: &Path) -> Result<Self> {
        assert!(path.is_dir(),
                "FsStorage::create expects a directory not a file!");
        Ok(FsStorage {
            current_offset: 0,
            current_id: 0,
            entries: BTreeMap::new(),
            persistent_entries: try!(OpenOptions::new()
                .write(true)
                .create(true)
                .truncate(true)
                .open(path.join(ENTRIES_FILENAME))),
            data: try!(OpenOptions::new()
                .read(true)
                .write(true)
                .create(true)
                .truncate(true)
                .open(path.join(DATA_FILENAME))),
            _item_type: PhantomData,
        })
    }

    /// Reads a FsStorage from an previously populated folder.
    fn load(path: &Path) -> Result<Self> {
        // Read from entry file to BTreeMap.
        let mut entries = BTreeMap::new();
        // 1. Open file and pass it to the decoder
        let entries_file = try!(OpenOptions::new().read(true).open(path.join(ENTRIES_FILENAME)));
        let mut decoder = VByteDecoder::new(entries_file);
        // 2. Decode entries and write them to BTreeMap
        let mut current_id: u64 = 0;
        let mut current_offset: u64 = 0;
        while let Some((id, len)) = decode_entry(&mut decoder) {
            current_id += id as u64;
            entries.insert(current_id, (current_offset, len));
            current_offset += len as u64;
        }

        Ok(FsStorage {
            current_id: current_id,
            current_offset: current_offset,
            entries: entries,
            persistent_entries: try!(OpenOptions::new()
                .append(true)
                .open(path.join(ENTRIES_FILENAME))),
            data: try!(OpenOptions::new()
                .read(true)
                .append(true)
                .open(path.join(DATA_FILENAME))),
            _item_type: PhantomData,
        })
    }

    fn associated_files() -> &'static [&'static str] {
        ASSOCIATED_FILES
    }
}




impl<TItem: ByteDecodable + ByteEncodable + Sync + Send> Storage<TItem> for FsStorage<TItem> {

    fn len(&self) -> usize {
        self.entries.len()
    }
    
    fn get(&self, id: u64) -> Result<Arc<TItem>> {
        // TODO: Think through this once more. Now with the new Read approach in ByteDecodable
        if let Some(item_position) = self.entries.get(&id) {
            // Get filehandle
            let mut f = self.data.try_clone().unwrap();
            // Seek to position of item
            f.seek(SeekFrom::Start(item_position.0)).unwrap();
            let mut bytes = vec![0; item_position.1 as usize];
            // Read all bytes
            f.read_exact(&mut bytes).unwrap();
            // Decode item
            let item = TItem::decode(&mut bytes.as_slice()).unwrap();
            Ok(Arc::new(item))
        } else {
            Err(StorageError::KeyNotFound)
        }
    }

    fn store(&mut self, id: u64, data: TItem) -> Result<()> {

        // Encode the data
        let bytes = data.encode();
        // Append it to the file
        if let Err(e) = self.data.write_all(&bytes) {
            return Err(StorageError::WriteError(Some(e)));
        }
        // And save the offset and the number of bytes written for later recovery
        self.entries.insert(id, (self.current_offset, bytes.len() as u32));
        // Also write the id, offset and number of bytes written to file for persistence
        let entry_bytes = encode_entry(self.current_id, id, bytes.len() as u32)?;
        if let Err(e) = self.persistent_entries.write_all(&entry_bytes) {
            return Err(StorageError::WriteError(Some(e)));
        }

        // Update id and offset
        self.current_id = id;
        self.current_offset += bytes.len() as u64;
        Ok(())
    }
}


fn encode_entry(current_id: u64, id: u64, length: u32) -> Result<Vec<u8>> {
    let mut bytes: Vec<u8> = Vec::new();
    VByteEncoded::new((id-current_id) as usize).write_to(&mut bytes)?;
    VByteEncoded::new(length as usize).write_to(&mut bytes)?;
    Ok(bytes)
}

fn decode_entry<R: Read>(decoder: &mut VByteDecoder<R>) -> Option<(u32, u32)> {
    let delta_id = try_option!(decoder.next()) as u32;
    let length = try_option!(decoder.next()) as u32;

    Some((delta_id, length))
}




#[cfg(test)]
mod tests {
    use super::*;
    
    use test_utils::create_test_dir;
    use utils::persistence::Persistent;
    use storage::{Storage, StorageError};
    
    
    #[test]
    fn basic() {
        let item1: u32 = 15;
        let item2: u32 = 32;
        let path = &create_test_dir("fs_storage_basic");
        let mut prov = FsStorage::create(path).unwrap();
        assert!(prov.store(0, item1.clone()).is_ok());
        assert_eq!(prov.get(0).unwrap().as_ref(), &item1);
        assert!(prov.store(1, item2.clone()).is_ok());
        assert_eq!(prov.get(1).unwrap().as_ref(), &item2);
        assert!(prov.get(0).unwrap().as_ref() != &item2);
        assert_eq!(prov.get(0).unwrap().as_ref(), &item1);
    }

    #[test]
    fn not_found() {
        let item1 = 15u32;
        let item2 = 32u32;
        let path = &create_test_dir("fs_storage_not_found");
        let mut prov = FsStorage::create(path).unwrap();
        assert!(prov.store(0, item1.clone()).is_ok());
        assert!(prov.store(1, item2.clone()).is_ok());
        assert!(if let StorageError::KeyNotFound = prov.get(2).err().unwrap() {
            true
        } else {
            false
        });
    }

    #[test]
    fn persistence() {
        let item1 = 1556;
        let item2 = 235425354;
        let item3 = 234543463709865987;
        let path = &create_test_dir("fs_storage_persistence");
        {
            let mut prov1 = FsStorage::create(path).unwrap();
            assert!(prov1.store(0, item1.clone()).is_ok());
            assert!(prov1.store(1, item2.clone()).is_ok());
        }

        {
            let mut prov2: FsStorage<usize> = FsStorage::load(path).unwrap();
            assert_eq!(prov2.get(0).unwrap().as_ref(), &item1);
            assert_eq!(prov2.get(1).unwrap().as_ref(), &item2);
            assert!(prov2.store(2, item3.clone()).is_ok());
            assert_eq!(prov2.get(2).unwrap().as_ref(), &item3);
        }

        {
            let prov3: FsStorage<usize> = FsStorage::load(path).unwrap();
            assert_eq!(prov3.get(0).unwrap().as_ref(), &item1);
            assert_eq!(prov3.get(1).unwrap().as_ref(), &item2);
            assert_eq!(prov3.get(2).unwrap().as_ref(), &item3);
        }
    }
}