Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@ private int readFromStream(BlockExtendedInputStream stream,
+ " from blockGroup " + stream.getBlockID() + " index "
+ currentStreamIndex() + 1);
}

if (actualRead != expectedRead) {
throw new IOException(String.format(
"Inconsistent read for blockID=%s index=%d expectedRead=%d actualRead=%d",
stream.getBlockID(), currentStreamIndex() + 1, expectedRead, actualRead));
}
return actualRead;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand All @@ -50,6 +51,7 @@
import org.apache.hadoop.security.token.Token;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/**
* Tests for ECBlockInputStream.
Expand Down Expand Up @@ -554,6 +556,48 @@ public void testEcPipelineRefreshFunction() {
}
}

@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
public void testZeroByteReadThrowsBadDataLocationException() throws Exception {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, ONEMB);
Map<DatanodeDetails, Integer> datanodes = new LinkedHashMap<>();
for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i);
}

BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 8 * ONEMB, datanodes);
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(true);

try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, null, null, streamFactory, clientConfig)) {

// Read a full stripe first to initialize and create streams in the factory
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
int read = ecb.read(buf);
assertEquals(3 * ONEMB, read);

// Simulate the Bug: Force the underlying stream to return 0 bytes (Short Read).
// Note: If the test stub `TestBlockInputStream` does not currently have
// a method to simulate a 0-byte read, you should add a simple boolean flag
// like `simulateZeroByteRead` to that stub class, making its read() return 0.
streamFactory.getBlockStreams().get(0).setSimulateZeroByteRead(true);

buf.clear();

// Assert that instead of spinning infinitely, the short read (0 bytes)
// immediately triggers the strict validation and throws BadDataLocationException.
// This exception is essential for the Proxy to initiate the Failover to Reconstruction.
BadDataLocationException e = assertThrows(BadDataLocationException.class, () -> ecb.read(buf));
List<DatanodeDetails> failed = e.getFailedLocations();

// Expect exactly 1 DN reported as failure due to the inconsistent read
assertEquals(1, failed.size());
// The failure should map to index = 1 (stream 0)
assertEquals(1, datanodes.get(failed.get(0)));
}
}

private void validateBufferContents(ByteBuffer buf, int from, int to,
byte val) {
for (int i = from; i < to; i++) {
Expand Down Expand Up @@ -593,6 +637,7 @@ private static class TestBlockInputStream extends BlockExtendedInputStream {
private BlockID blockID;
private long length;
private boolean throwException = false;
private boolean simulateZeroByteRead = false;
private static final byte EOF = -1;

@SuppressWarnings("checkstyle:parameternumber")
Expand All @@ -610,6 +655,10 @@ public void setThrowException(boolean shouldThrow) {
this.throwException = shouldThrow;
}

public void setSimulateZeroByteRead(boolean simulateZeroByteRead) {
this.simulateZeroByteRead = simulateZeroByteRead;
}

@Override
public BlockID getBlockID() {
return blockID;
Expand All @@ -636,6 +685,10 @@ public int read(ByteBuffer buf) throws IOException {
throw new IOException("Simulated exception");
}

if (simulateZeroByteRead) {
return 0;
}

int toRead = Math.min(buf.remaining(), (int)getRemaining());
for (int i = 0; i < toRead; i++) {
buf.put(dataVal);
Expand Down